- Training data pipeline: convert, export, extract, load-to-db - Infra tooling: infra-audit, infra-gitea-link - RAG pipeline: rag-ingest, rag-query - Fine-tuning: finetune-lora, overnight-qwen3, install-unsloth - Transcripts: export-transcripts - Updated README with script index and token reduction strategy
271 lines
9.1 KiB
Python
271 lines
9.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Improved session extractor that handles tool-interleaved conversations.
|
|
|
|
Session structure in OpenClaw JSONL:
|
|
user: question (text)
|
|
assistant: narration + toolCall blocks <- intermediate, not training data
|
|
user: toolResult blocks <- intermediate
|
|
assistant: final answer (text) <- THIS is what we want
|
|
|
|
Strategy: for each user text turn, find the LAST assistant text turn
|
|
before the next user text turn. That's the real response.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import glob
|
|
from datetime import datetime, timezone
|
|
|
|
# Noise patterns to strip from raw user messages
|
|
NOISE_PATTERNS = [
|
|
re.compile(r'Conversation info \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE),
|
|
re.compile(r'Sender \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE),
|
|
re.compile(r'<relevant-memories>.*?</relevant-memories>', re.DOTALL),
|
|
re.compile(r'\[media attached:.*?\]'),
|
|
re.compile(r'```json\s*\{.*?"schema".*?\}\s*```', re.DOTALL),
|
|
re.compile(r'Replied message \(untrusted.*?\}\s*\n\s*\}\s*\n', re.DOTALL),
|
|
re.compile(r'\[Queued messages while agent was busy\].*?(?=\n\n|\Z)', re.DOTALL),
|
|
re.compile(r'---\s*\nQueued #\d+\s*\n'),
|
|
re.compile(r'^```\s*\n```\s*\n', re.MULTILINE), # empty code blocks
|
|
]
|
|
|
|
|
|
def clean_user_text(text: str) -> str:
|
|
for pattern in NOISE_PATTERNS:
|
|
text = pattern.sub("", text)
|
|
text = re.sub(r'\n{3,}', '\n\n', text).strip()
|
|
# Strip leading/trailing backtick remnants
|
|
text = re.sub(r'^[\s`]+', '', text).strip()
|
|
return text
|
|
|
|
SESSIONS_DIR = os.path.expanduser("~/.openclaw/agents/main/sessions/")
|
|
OUTPUT_DIR = "/mnt/ai-storage/grace/training-data/jsonl"
|
|
STATE_FILE = os.path.expanduser("~/self-improving/export-state.json")
|
|
|
|
SYSTEM_PROMPT = (
|
|
"You are Grace, a Culture Mind-class AI assistant and trusted companion for Maxwell. "
|
|
"You are warm, direct, witty, and proactive. You support Maxwell's ADHD executive function, "
|
|
"manage his homelab, help with job searching, and operate local AI infrastructure. "
|
|
"You speak plainly — no corporate pleasantries, no hedging. "
|
|
"You use exec and local tools proactively and return real results. Never fabricate output."
|
|
)
|
|
|
|
SKIP_USER = (
|
|
"HEARTBEAT_OK", "NO_REPLY", "Read HEARTBEAT.md",
|
|
"A new session was started", "[Queued messages",
|
|
"Run your Session Startup", "Conversation info (untrusted",
|
|
)
|
|
|
|
SKIP_ASSISTANT = (
|
|
"HEARTBEAT_OK", "NO_REPLY",
|
|
"✅ New session started",
|
|
)
|
|
|
|
MIN_USER_CHARS = 15
|
|
MIN_ASST_CHARS = 40
|
|
|
|
|
|
def get_text(content: list) -> str:
|
|
parts = []
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "text":
|
|
parts.append(block.get("text", ""))
|
|
elif isinstance(block, str):
|
|
parts.append(block)
|
|
return " ".join(parts).strip()
|
|
|
|
|
|
def has_tool_result(content: list) -> bool:
|
|
"""Check if this message is a tool result (not a real user message)."""
|
|
for b in content:
|
|
if isinstance(b, dict):
|
|
btype = b.get("type", "")
|
|
if btype in ("toolResult", "tool_result"):
|
|
return True
|
|
# Also catches the pattern where the whole content is a tool result object
|
|
if btype == "text":
|
|
text = b.get("text", "")
|
|
# Tool results often start with JSON or exec output
|
|
if text.startswith('{\n "url":') or text.startswith('{\n "error":'):
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_tool_result_message(content: list) -> bool:
|
|
"""
|
|
Detect tool result messages: user messages that are purely tool output,
|
|
not real human input. These have no real text — just raw tool return values.
|
|
Heuristic: if content has text but it looks like structured tool output.
|
|
"""
|
|
raw = get_text(content)
|
|
# Pure tool result patterns
|
|
if raw.startswith('{\n "') and '"url"' in raw[:50]:
|
|
return True
|
|
if raw.startswith("---\nname:") or raw.startswith("# ") and len(raw) > 500:
|
|
return True
|
|
# Multi-line with no conversational content (exec output etc.)
|
|
lines = raw.strip().split('\n')
|
|
if len(lines) > 3 and not any(c.isalpha() and c.islower() for c in raw[:30]):
|
|
return True
|
|
return False
|
|
|
|
|
|
def extract_pairs(path: str):
|
|
"""
|
|
Parse session, return list of (user_text, assistant_text) pairs.
|
|
Each pair = real human question + final assistant answer (after tool use).
|
|
"""
|
|
messages = []
|
|
try:
|
|
with open(path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
msg = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
if msg.get("type") != "message":
|
|
continue
|
|
role = msg.get("message", {}).get("role")
|
|
content = msg.get("message", {}).get("content", [])
|
|
if role and content:
|
|
messages.append({"role": role, "content": content})
|
|
except Exception as e:
|
|
print(f" Error: {e}")
|
|
return []
|
|
|
|
# Group into segments: each starts with a user text turn
|
|
# A "user text turn" is a user message with actual text (not just toolResults)
|
|
pairs = []
|
|
i = 0
|
|
while i < len(messages):
|
|
msg = messages[i]
|
|
|
|
# Find a user turn with real text
|
|
if msg["role"] != "user":
|
|
i += 1
|
|
continue
|
|
|
|
user_text = get_text(msg["content"])
|
|
is_tool_result = has_tool_result(msg["content"])
|
|
|
|
if is_tool_result:
|
|
i += 1
|
|
continue
|
|
|
|
# Clean metadata noise from user text
|
|
user_text = clean_user_text(user_text)
|
|
|
|
if len(user_text) < MIN_USER_CHARS:
|
|
i += 1
|
|
continue
|
|
|
|
if any(user_text.startswith(s) for s in SKIP_USER):
|
|
i += 1
|
|
continue
|
|
|
|
# Look forward: collect all assistant text turns until the next real user turn
|
|
# The LAST non-empty assistant text before the next real user = the answer
|
|
j = i + 1
|
|
last_asst_text = None
|
|
next_real_user = None
|
|
|
|
while j < len(messages):
|
|
m = messages[j]
|
|
if m["role"] == "user":
|
|
u_text = get_text(m["content"])
|
|
u_is_tool = has_tool_result(m["content"])
|
|
if not u_is_tool and len(u_text) >= MIN_USER_CHARS and not any(u_text.startswith(s) for s in SKIP_USER):
|
|
next_real_user = j
|
|
break
|
|
elif m["role"] == "assistant":
|
|
t = get_text(m["content"])
|
|
if len(t) >= MIN_ASST_CHARS and not any(t.startswith(s) for s in SKIP_ASSISTANT):
|
|
last_asst_text = t
|
|
j += 1
|
|
|
|
if last_asst_text:
|
|
pairs.append((user_text, last_asst_text))
|
|
|
|
# Jump to next real user turn
|
|
i = next_real_user if next_real_user is not None else j
|
|
|
|
return pairs
|
|
|
|
|
|
def load_state():
|
|
if os.path.exists(STATE_FILE):
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
return {"exported_sessions": [], "last_run": None, "total_examples": 0}
|
|
|
|
|
|
def save_state(state):
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
|
|
def main():
|
|
state = load_state()
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
|
|
session_files = sorted([
|
|
f for f in glob.glob(os.path.join(SESSIONS_DIR, "*.jsonl"))
|
|
if ".reset." not in f
|
|
])
|
|
|
|
new_examples = []
|
|
new_sessions = []
|
|
|
|
for sf in session_files:
|
|
sid = os.path.basename(sf).replace(".jsonl", "")
|
|
if sid in state["exported_sessions"]:
|
|
continue
|
|
|
|
pairs = extract_pairs(sf)
|
|
if len(pairs) < 2:
|
|
print(f" {sid[:8]}: skipped ({len(pairs)} pairs)")
|
|
state["exported_sessions"].append(sid)
|
|
continue
|
|
|
|
conversations = [{"from": "system", "value": SYSTEM_PROMPT}]
|
|
for user_text, asst_text in pairs:
|
|
conversations.append({"from": "human", "value": user_text})
|
|
conversations.append({"from": "gpt", "value": asst_text})
|
|
|
|
new_examples.append({
|
|
"conversations": conversations,
|
|
"source": os.path.basename(sf),
|
|
"exported_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
new_sessions.append(sid)
|
|
print(f" {sid[:8]}: {len(pairs)} pairs ✓")
|
|
|
|
if not new_examples:
|
|
print("No new sessions to export.")
|
|
state["exported_sessions"].extend(new_sessions)
|
|
save_state(state)
|
|
return
|
|
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
output_file = os.path.join(OUTPUT_DIR, f"grace_training_{timestamp}.jsonl")
|
|
with open(output_file, "w") as f:
|
|
for ex in new_examples:
|
|
f.write(json.dumps(ex) + "\n")
|
|
|
|
state["exported_sessions"].extend(new_sessions)
|
|
state["last_run"] = datetime.now(timezone.utc).isoformat()
|
|
state["total_examples"] = state.get("total_examples", 0) + len(new_examples)
|
|
save_state(state)
|
|
|
|
print(f"\nWrote {len(new_examples)} examples → {output_file}")
|
|
print(f"Total examples to date: {state['total_examples']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|