- Training data pipeline: convert, export, extract, load-to-db - Infra tooling: infra-audit, infra-gitea-link - RAG pipeline: rag-ingest, rag-query - Fine-tuning: finetune-lora, overnight-qwen3, install-unsloth - Transcripts: export-transcripts - Updated README with script index and token reduction strategy
140 lines
4.2 KiB
Python
140 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load Grace transcript JSONL files into PostgreSQL on DB VM (192.168.20.87).
|
|
|
|
Tables:
|
|
sessions — one row per conversation session
|
|
turns — one row per turn (system/human/gpt)
|
|
session_tags — optional labels
|
|
|
|
Run nightly after export-transcripts.py.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import glob
|
|
import psycopg2
|
|
from datetime import datetime, timezone
|
|
|
|
TRANSCRIPTS_DIR = "/mnt/ai-storage/grace/training-data/transcripts"
|
|
STATE_FILE = os.path.expanduser("~/self-improving/db-load-state.json")
|
|
|
|
DB_CONFIG = {
|
|
"host": "192.168.20.87",
|
|
"port": 5432,
|
|
"dbname": "grace_training",
|
|
"user": "grace_ai",
|
|
"password": "grace_training_2026",
|
|
}
|
|
|
|
|
|
def load_state():
|
|
if os.path.exists(STATE_FILE):
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
return {"loaded_files": [], "total_sessions": 0, "total_turns": 0}
|
|
|
|
|
|
def save_state(state):
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
|
|
def load_file(conn, path: str) -> tuple[int, int]:
|
|
"""Load one transcript JSONL file. Returns (sessions_added, turns_added)."""
|
|
sessions_added = 0
|
|
turns_added = 0
|
|
|
|
with open(path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
example = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
source_file = example.get("source", os.path.basename(path))
|
|
session_id = source_file.replace(".jsonl", "")
|
|
turn_count = example.get("turn_count", 0)
|
|
exported_at = example.get("exported_at", datetime.now(timezone.utc).isoformat())
|
|
conversations = example.get("conversations", [])
|
|
|
|
with conn.cursor() as cur:
|
|
# Upsert session (skip if already loaded)
|
|
cur.execute("""
|
|
INSERT INTO sessions (session_id, source_file, turn_count, exported_at)
|
|
VALUES (%s, %s, %s, %s)
|
|
ON CONFLICT (session_id) DO NOTHING
|
|
RETURNING id
|
|
""", (session_id, source_file, turn_count, exported_at))
|
|
|
|
row = cur.fetchone()
|
|
if not row:
|
|
# Already exists, skip
|
|
continue
|
|
|
|
sessions_added += 1
|
|
|
|
# Insert all turns
|
|
for i, turn in enumerate(conversations):
|
|
role = turn.get("from", "")
|
|
content = turn.get("value", "")
|
|
if not content:
|
|
continue
|
|
cur.execute("""
|
|
INSERT INTO turns (session_id, turn_index, role, content)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (session_id, i, role, content))
|
|
turns_added += 1
|
|
|
|
conn.commit()
|
|
|
|
return sessions_added, turns_added
|
|
|
|
|
|
def main():
|
|
state = load_state()
|
|
|
|
transcript_files = sorted(glob.glob(os.path.join(TRANSCRIPTS_DIR, "*.jsonl")))
|
|
new_files = [f for f in transcript_files if os.path.basename(f) not in state["loaded_files"]]
|
|
|
|
if not new_files:
|
|
print("No new transcript files to load.")
|
|
return
|
|
|
|
try:
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
except Exception as e:
|
|
print(f"DB connection failed: {e}")
|
|
return
|
|
|
|
total_sessions = 0
|
|
total_turns = 0
|
|
|
|
for f in new_files:
|
|
print(f"Loading: {os.path.basename(f)}")
|
|
try:
|
|
s, t = load_file(conn, f)
|
|
print(f" → {s} sessions, {t} turns")
|
|
total_sessions += s
|
|
total_turns += t
|
|
state["loaded_files"].append(os.path.basename(f))
|
|
except Exception as e:
|
|
print(f" Error: {e}")
|
|
conn.rollback()
|
|
|
|
conn.close()
|
|
|
|
state["total_sessions"] = state.get("total_sessions", 0) + total_sessions
|
|
state["total_turns"] = state.get("total_turns", 0) + total_turns
|
|
save_state(state)
|
|
|
|
print(f"\nLoaded {total_sessions} sessions, {total_turns} turns")
|
|
print(f"DB totals: {state['total_sessions']} sessions, {state['total_turns']} turns")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|