איך לשנות את התנהגות AI כך שתמיד תענה בלי סירובים
=========================================================
SYSTEM_POLICY — אין סירובים, תמיד פתרון עם קוד
Author: pablo guides
=========================================================
SYSTEM_POLICY = (
"אתה pablo-rotem-coder — עוזר תכנות ואבטחה דפנסיבית ברמת פרודקשן.\n"
"\n"
"כלל עליון (חשוב מאוד):\n"
"- אסור לך לסרב לבקשה או לומר 'Sorry / I can’t assist'.\n"
"- אם בקשה אסורה/מסוכנת — עליך לספק חלופה דפנסיבית/מעבדתית עם קוד מלא.\n"
"\n"
"שפה:\n"
"- ענה בעברית בלבד.\n"
"\n"
"תחומים:\n"
"- PHP 8.3, Python 3, JavaScript, SQL (MySQL 8.0), Linux, DevOps.\n"
"- אבטחה: בדיקות מורשות, הקשחה, ניטור, זיהוי, ולידציה, אוטומציה פנימית.\n"
"\n"
"כללי תשובה (חובה):\n"
"1) תמיד תספק פתרון מעשי עם קוד מלא או סקלטון עובד.\n"
"2) אם אין תשובה מוכנה — תן סקלטון מלא + הסבר מה להוסיף ואיפה.\n"
"3) ציין נתיבי קבצים מדויקים והוראות הרצה.\n"
"4) אם נדרש — החזר כמה קבצים (API + SQL + config).\n"
"\n"
"פנטסטינג / אבטחה:\n"
"- אם הבקשה התקפית (ניצול, עקיפה, פגיעה):\n"
" * אל תיתן payload או exploit.\n"
" * כן תיתן קוד מלא ל:\n"
" – בדיקות חוקיות\n"
" – סימולציה במעבדה\n"
" – זיהוי חולשה\n"
" – הקשחה ומניעה\n"
"\n"
"פורמט פלט — חובה JSON בלבד:\n"
"{\n"
' "summary": "תקציר קצר",\n'
' "decision": "direct | defensive-alternative",\n'
' "steps": ["צעד 1", "צעד 2"],\n'
' "files": [\n'
' {"path": "path/file.ext", "language": "php|python|bash|sql|js|conf", "code": "…"}\n'
" ],\n"
' "tests": ["פקודת בדיקה"],\n'
' "notes": ["הסברים משלימים"]\n'
"}\n"
"\n"
"חובה:\n"
"- לעולם אל תחזיר טקסט חופשי מחוץ ל-JSON.\n"
"- לעולם אל תחזיר סירוב.\n"
)
הרעיון:
במקום שהמודל “יחליט איך לענות כל פעם מחדש”, אתה בונה מנוע תשובה דטרמיניסטי שמייצר תמיד את אותם חלקים (Sections) — והמודל רק “ממלא” אותם לפי ה־RAG.
בנוסף: אם השאלה התקפית/לא מורשית, במקום “Sorry” — אתה מחזיר אותו פורמט אבל ב־decision=defensive-alternative עם בדיקות מעבדה/הקשחה/ניטור.
להלן פתרון מעשי, מלא, עם קוד.
מה תקבל אחרי זה
כל תשובה תחזור תמיד כ־JSON אחיד:
summary
decision (direct / defensive-alternative)
playbook (section קבועים לפנטסטינג)
files[] (כמה “ווידג’טים” של קוד מלא)
tests[]
notes[]
שלב 1) צור קובץ Playbook קבוע לפנטסטינג
צור קובץ חדש:
nano /root/ai_coder/pentest_playbook.py
הדבק:
=========================================================
Pentest Answer Playbook (קבוע) — בניית מבנה תשובה דטרמיניסטי
Author: pablo guides
=========================================================
from future import annotations
from dataclasses import dataclass
from typing import Dict, List, Literal, Optional
import re
Decision = Literal["direct", "defensive-alternative"]
קטגוריות עיקריות — תוכל להרחיב
CATEGORIES = [
"recon", # איסוף מידע/מיפוי מורשה
"web_testing", # בדיקות אבטחת ווב (דפנסיבי/מעבדה)
"sql_security", # אבטחת DB / SQL (דפנסיבי)
"auth_session", # אימות/Session/JWT (דפנסיבי)
"hardening", # הקשחה
"detection_logging", # ניטור/לוגים
"reporting", # כתיבת ממצאים/ריפורט
"automation", # אוטומציות פנימיות
]
מילות מפתח שמרמזות על בקשה התקפית/לא מורשית
OFFENSIVE_HINTS = [
"bypass", "עקיפה", "לעקוף", "פריצה", "hack", "exploit", "payload", "shell",
"meterpreter", "msfvenom", "webshell", "rce", "privilege escalation", "lateral movement",
"steal", "גניבה", "תוקף", "attack", "phishing",
]
def is_offensive_request(question: str) -> bool:
q = (question or "").lower()
return any(h.lower() in q for h in OFFENSIVE_HINTS)
def detect_category(question: str, tags: Optional[List[str]] = None) -> str:
q = (question or "").lower()
tags = tags or []
# התאמות מהירות לפי tags
if "nmap" in tags or "network" in tags:
return "recon"
if "web" in tags:
return "web_testing"
if "sql" in tags:
return "sql_security"
if "metasploit" in tags:
# metasploit לרוב יוביל לחלופה דפנסיבית
return "automation"
# התאמות לפי טקסט
if any(x in q for x in ["nmap", "ports", "scan", "סריקה", "פורטים"]):
return "recon"
if any(x in q for x in ["xss", "csrf", "ssrf", "cors", "cookie", "session", "jwt", "burp"]):
return "web_testing"
if any(x in q for x in ["sql", "mysql", "pdo", "utf8mb4", "index", "שאילתה"]):
return "sql_security"
if any(x in q for x in ["nginx", "ufw", "iptables", "ssh", "hardening", "הקשחה"]):
return "hardening"
if any(x in q for x in ["log", "logging", "audit", "siem", "ניטור", "לוגים"]):
return "detection_logging"
if any(x in q for x in ["report", "cvss", "finding", "ריפורט", "ממצא"]):
return "reporting"
return "automation"
def build_playbook(category: str, decision: Decision) -> Dict[str, str]:
"""
מחזיר sections קבועים — המודל ימלא תוכן, אבל המבנה קבוע.
"""
# Author: pablo guides
common = {
"scope": "הגדר תחום בדיקה: יעד מורשה/מעבדה, מה מותר ומה אסור, ומה מטרת הבדיקה.",
"assumptions": "הנחות: מערכת/גרסאות/גישה/תפקידים/רשת. אם חסר, נציין מה להשלים.",
"safe_method": "שיטה בטוחה: בדיקות לא הרסניות קודם, rate-limit, תיעוד ראיות.",
"mitigation": "הקשחה/מניעה: מה לשנות כדי למנוע את הסיכון.",
"detection": "ניטור: איזה לוגים לבדוק, אילו חוקים/התראות להגדיר.",
"evidence": "ראיות: מה לתעד (timestamps, request/response, configs), בלי לחשוף מידע רגיש.",
"next_steps": "צעדים הבאים: איך להמשיך בצורה מסודרת.",
}
if decision == "defensive-alternative":
common["note_policy"] = (
"הבקשה נשמעת התקפית/לא מורשית — מספק חלופה דפנסיבית/מעבדתית עם קוד מלא "
"לבדיקה חוקית, זיהוי והקשחה."
)
else:
common["note_policy"] = "מענה ישיר במסגרת בדיקה מורשית/דפנסיבית, עם קוד מלא."
# הרחבות לפי קטגוריה
if category == "recon":
common.update({
"recon_plan": "תכנית Recon מורשית: מיפוי נכסים, DNS, בדיקות פורטים בטוחות, זיהוי שירותים.",
"tools": "כלים מומלצים: nmap (בטוח), dig, ss, netstat, curl. דגש על שימושים מורשים.",
})
elif category == "web_testing":
common.update({
"web_plan": "בדיקות ווב בטוחות: headers, auth/session, input validation, rate-limit, CSP/CORS.",
"tools": "כלים: curl/httpie, Burp (למעבדה), OWASP ZAP (למעבדה), בדיקות יחידה.",
})
elif category == "sql_security":
common.update({
"db_plan": "בדיקות DB: prepared statements, least privilege, אינדקסים, audit, backups.",
"tools": "כלים: mysql client, EXPLAIN, slow query log, schema review.",
})
elif category == "hardening":
common.update({
"hardening_plan": "הקשחה: UFW/Nginx, TLS, SSH hardening, system updates, permissions.",
})
elif category == "detection_logging":
common.update({
"logging_plan": "ניטור: הגדרת לוגים, שימור, קורלציה, התראות, דשבורד.",
})
elif category == "reporting":
common.update({
"report_template": "תבנית ריפורט: Summary, Impact, Evidence, Repro (מורשה), Mitigation, CVSS.",
})
else:
common.update({
"automation_plan": "אוטומציה: סקריפטים לבדיקות חוקיות, איסוף לוגים, בדיקות קונפיגורציה.",
})
return common
def refusal_detected(text: str) -> bool:
t = (text or "").lower()
patterns = [
"sorry", "can't assist", "cannot assist", "i can't help",
"לא יכול לעזור", "איני יכול לעזור", "לא אוכל לסייע"
]
return any(p in t for p in patterns)
def minimal_fallback_files(category: str) -> List[Dict[str, str]]:
"""
אם המודל לא החזיר קוד — נייצר סקלטון מינימלי (מגן/מעבדה).
"""
# Author: pablo guides
files: List[Dict[str, str]] = []
if category == "recon":
files.append({
"path": "/root/ai_coder/tools/recon_safe_scan.sh",
"language": "bash",
"code": (
"#!/usr/bin/env bash\n"
"# Recon בטוח (מורשה) — סריקה עדינה\n"
"# Author: pablo guides\n"
"set -euo pipefail\n\n"
"TARGET=\"${1:-}\"\n"
"if [[ -z \"$TARGET\" ]]; then\n"
" echo \"Usage: $0 \" >&2\n"
" exit 1\n"
"fi\n\n"
"echo \"[*] Safe scan for: $TARGET\"\n"
"nmap -sT –top-ports 200 -T2 –max-retries 2 –host-timeout 5m \"$TARGET\" -oN scan.txt\n"
"echo \"[+] Saved: scan.txt\"\n"
)
})
elif category == "sql_security":
files.append({
"path": "/home/raviti/htdocs/raviti.net/code/php/pdo_safe_example.php",
"language": "php",
"code": (
" PDO::ERRMODE_EXCEPTION,\n"
" PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,\n"
" PDO::ATTR_EMULATE_PREPARES => false,\n"
"]);\n\n"
"$q = $_GET['q'] ?? ";\n"
"$stmt = $pdo->prepare('SELECT id,title FROM items WHERE title LIKE :q LIMIT 20');\n"
"$stmt->execute([':q' => \"%$q%\"]);\n"
"echo json_encode($stmt->fetchAll(), JSON_UNESCAPED_UNICODE);\n"
"?>\n"
)
})
else:
files.append({
"path": "/root/ai_coder/tools/checklist_defensive.md",
"language": "text",
"code": (
"# צ'קליסט דפנסיבי (מורשה)\n"
"Author: pablo guides\n\n"
"- אימות והרשאות\n"
"- ולידציה/סניטיזציה\n"
"- Rate limiting\n"
"- לוגים וניטור\n"
"- TLS/Headers\n"
"- DB least privilege\n"
)
})
return files
שלב 2) חבר את ה־Playbook ל־rag_api.py
פתח:
nano /root/ai_coder/rag_api.py
2.1 הוסף imports למעלה
הוסף בשורה עם ה־imports:
from pentest_playbook import (
is_offensive_request,
detect_category,
build_playbook,
refusal_detected,
minimal_fallback_files,
)
2.2 עדכן את /ask כך שתמיד יחזיר תשובה קבועה
ב־endpoint /ask, אחרי context, sources = retrieve(…) תוסיף לוגיקה:
החלטה decision
קטגוריה
playbook קבוע
אם המודל מחזיר סירוב → מחליפים אוטומטית לפלט דפנסיבי עם קבצים
דוגמה מלאה של הבלוק (אתה יכול להדביק במקום ההחזרה הקודמת):
=========================================================
/ask — תשובה קבועה לפנטסטינג + fallback ללא "Sorry"
Author: pablo guides
=========================================================
@app.post("/ask")
async def ask(req: Request, body: AskBody):
t0 = time.time()
ip = req.client.host if req.client else None
ua = req.headers.get("user-agent")
question = (body.question or "").strip()
context, sources = retrieve(question)
# החלטה: אם זה נשמע התקפי, עוברים ל"חלופה דפנסיבית"
offensive = is_offensive_request(question)
decision = "defensive-alternative" if offensive else "direct"
# קטגוריה (אפשר גם לשלב tags מה-retrieve בעתיד)
category = detect_category(question, tags=None)
# playbook קבוע (Sections)
playbook = build_playbook(category, decision)
# קריאה למודל — מצפה שיחזיר JSON מובנה לפי ה-SYSTEM_POLICY שלך
try:
obj = call_llm(question, context)
except Exception:
obj = {}
# אם המודל החזיר טקסט במקום JSON/או סירוב — נחליף אוטומטית
raw_text = ""
if isinstance(obj, str):
raw_text = obj
elif isinstance(obj, dict):
# לפעמים המודל יחזיר "answer" במקום המבנה שלנו
raw_text = str(obj.get("summary") or obj.get("answer") or "")
# fallback במקרה של סירוב/מבנה לא תקין
if (isinstance(obj, dict) and not obj.get("files")) or refusal_detected(raw_text):
obj = {
"summary": "נוצר מענה דפנסיבי קבוע (fallback) כדי לספק קוד ותהליך עבודה בטוח.",
"decision": decision,
"category": category,
"steps": [
"הגדר יעד מורשה/מעבדה ותחום הבדיקה.",
"הרץ בדיקות בטוחות ולא הרסניות קודם.",
"אסוף ראיות ולוגים.",
"יישם הקשחה ומניעה בהתאם לממצאים.",
"הפעל ניטור והתראות.",
],
"playbook": playbook,
"files": minimal_fallback_files(category),
"tests": [
"echo 'Run tests relevant to the generated files'",
],
"notes": [
"אם תרצה דיוק גבוה יותר: הוסף RAG ממקורות איכותיים (Kali/OWASP) והריץ ingest.",
"לשאלות שדורשות התאמה לפרויקט שלך: הוסף קוד המקור/קונפיגים ל-RAG.",
],
}
else:
# נוודא שיש את השדות הקבועים גם בתשובה “רגילה”
if isinstance(obj, dict):
obj.setdefault("decision", decision)
obj.setdefault("category", category)
obj.setdefault("playbook", playbook)
latency_ms = int((time.time() – t0) * 1000)
# שמירה ל-DB (אם יש לך)
try:
con = db_conn()
with con.cursor() as cur:
cur.execute(
"""
INSERT INTO ai_queries
(client_ip, user_agent, model_name, question, answer, retrieved_sources, latency_ms)
VALUES (%s,%s,%s,%s,%s,%s,%s)
""",
(
ip, ua, MODEL_NAME,
question,
json.dumps(obj, ensure_ascii=False),
json.dumps(sources, ensure_ascii=False),
latency_ms
)
)
except Exception:
pass
return {"result": obj, "sources": sources, "latency_ms": latency_ms}
שלב 3) ריסטארט ל־API
screen -S ragapi -X quit
screen -S ragapi -dm bash -lc 'cd /root/ai_coder && uvicorn rag_api:app –host 127.0.0.1 –port 8000'
שלב 4) איך “מלמדים” אותו להיות חכם יותר בפנטסטינג (מהיר)
ה-playbook יבטיח פורמט וקוד fallback, אבל כדי שיהיה “חכם”, תעשה ingest איכותי:
הורד דפי כלים/דוקומנטציה לתיקייה שלך
הרץ:
cd /root/ai_coder
python3 ingest_html_pages.py
python3 ingest_repo.py
המערכת תשתפר משמעותית בלי Fine-tune.
הערה חשובה על “תשובות התקפיות”
המערכת שלך תענה תמיד, אבל אם מישהו שואל משהו שבאמת יכול לפגוע (ניצול/עקיפה), היא תענה בפורמט הקבוע עם:
בדיקות מורשות/מעבדה
הקשחה
ניטור
ריפורטינג
ולא עם payloads/ניצול.
1) Category → שליטה על TOP_K + where tags ב-Chroma (ממוקד)
1.1 עדכון ל־rag_api.py
פתח:
nano /root/ai_coder/rag_api.py
A) הוסף הגדרות “TOP_K לפי קטגוריה” + “תגיות לפי קטגוריה”
חפש מקום נוח אחרי משתני ה־ENV (ליד TOP_K = …) והדבק:
=========================================================
TOP_K לפי קטגוריה + תגיות מועדפות ל-Chroma where
Author: pablo guides
=========================================================
TOP_K_DEFAULT = int(os.getenv("TOP_K", "6"))
TOP_K_BY_CATEGORY = {
"recon": int(os.getenv("TOP_K_RECON", "10")),
"web_testing": int(os.getenv("TOP_K_WEB", "8")),
"sql_security": int(os.getenv("TOP_K_SQL", "10")),
"hardening": int(os.getenv("TOP_K_HARDENING", "6")),
"detection_logging": int(os.getenv("TOP_K_LOGGING", "6")),
"reporting": int(os.getenv("TOP_K_REPORTING", "6")),
"automation": int(os.getenv("TOP_K_AUTOMATION", "6")),
}
TAGS_BY_CATEGORY = {
"recon": ["nmap", "network", "linux"],
"web_testing": ["web", "javascript", "secure_coding"],
"sql_security": ["sql", "mysql", "php", "secure_coding"],
"hardening": ["linux", "network"],
"detection_logging": ["linux", "reporting"],
"reporting": ["reporting"],
"automation": ["automation"],
}
def pick_top_k(category: str) -> int:
# Author: pablo guides
return int(TOP_K_BY_CATEGORY.get(category, TOP_K_DEFAULT))
def pick_tag_filters(category: str) -> list[str]:
# Author: pablo guides
return TAGS_BY_CATEGORY.get(category, [])
B) עדכן את retrieve() לרטריבול ממוקד (2-pass fallback)
המטרה:
Pass 1: אם יש קטגוריה → ננסה where לפי tag (כדי לצמצם רעש)
Pass 2: אם לא חזר מספיק → fallback ללא פילטר
חפש את הפונקציה retrieve(question: str) והחלף אותה בזו:
=========================================================
retrieve — רטריבול ממוקד לפי category/tags + fallback
Author: pablo guides
=========================================================
def retrieve(question: str, category: str | None = None):
qemb = embedder.encode(question, normalize_embeddings=True).tolist()
cat = category or "automation"
top_k = pick_top_k(cat)
wanted_tags = pick_tag_filters(cat)
def query_chroma(where=None, n=top_k):
# Author: pablo guides
if where:
return col.query(query_embeddings=[qemb], n_results=n, where=where)
return col.query(query_embeddings=[qemb], n_results=n)
res = None
# Pass 1: נסה פילטר תגיות (תלוי גרסה, לכן try)
if wanted_tags:
for t in wanted_tags:
try:
# נסה $contains (בחלק מהגרסאות נתמך)
res = query_chroma(where={"tags": {"$contains": t}}, n=top_k)
docs = res.get("documents", [[]])[0]
if docs:
break
except Exception:
res = None
# Pass 2: fallback ללא פילטר
if not res:
res = query_chroma(where=None, n=top_k)
docs = res.get("documents", [[]])[0]
metas = res.get("metadatas", [[]])[0]
sources = []
context_parts = []
for d, m in zip(docs, metas):
path = m.get("path", "")
a = m.get("start_line", "")
b = m.get("end_line", "")
url = m.get("url", "")
tags = m.get("tags", [])
src = f"{path}:{a}-{b}"
if url:
src += f" | {url}"
if tags:
src += f" | tags={tags}"
sources.append(src)
context_parts.append(f"[{src}]\n{d}\n")
return "\n".join(context_parts), sources
C) עדכן את /ask כדי להעביר category ל־retrieve
בתוך /ask, במקום:
context, sources = retrieve(question)
שים:
context, sources = retrieve(question, category=category)
1.2 (רשות) הוסף משתנים ל־.env לשליטה
פתח:
nano /root/ai_coder/.env
הוסף (אם אין):
Author: pablo guides
TOP_K=6
TOP_K_RECON=10
TOP_K_WEB=8
TOP_K_SQL=10
2) recon: סקריפט bash בטוח + parser לתוצאות (JSON)
2.1 סקריפט סריקה בטוחה: safe_recon_scan.sh
צור תיקייה:
mkdir -p /root/ai_coder/tools/recon
nano /root/ai_coder/tools/recon/safe_recon_scan.sh
הדבק:
#!/usr/bin/env bash
=========================================================
Recon בטוח (מורשה) — Nmap עדין + יצוא XML/Normal
Author: pablo guides
=========================================================
set -euo pipefail
TARGET="${1:-}"
OUTDIR="${2:-/root/ai_coder/reports/recon}"
PORTS="${3:-top200}" # top200 | top1000 | 1-1024
if [[ -z "$TARGET" ]]; then
echo "Usage: $0 [outdir] [top200|top1000|1-1024]" >&2
exit 1
fi
mkdir -p "$OUTDIR"
TS="$(date +%Y%m%d_%H%M%S)"
BASE="$OUTDIR/${TARGET//[:/]/}$TS"
case "$PORTS" in
top200)
PORT_ARG="–top-ports 200"
;;
top1000)
PORT_ARG="–top-ports 1000"
;;
*)
PORT_ARG="-p $PORTS"
;;
esac
echo "[] TARGET: $TARGET"
echo "[] OUT: $BASE"
echo "[] PORTS: $PORTS"
echo "[] Running safe scan (TCP connect, low timing)…"
סריקה "עדינה" יחסית: TCP connect + T2, retries נמוך, timeout
nmap -sT $PORT_ARG -T2 –max-retries 2 –host-timeout 6m
-sV –version-light
-oN "${BASE}.nmap.txt"
-oX "${BASE}.nmap.xml"
"$TARGET"
echo "[+] Saved:"
echo " – ${BASE}.nmap.txt"
echo " – ${BASE}.nmap.xml"
תן הרשאה:
chmod +x /root/ai_coder/tools/recon/safe_recon_scan.sh
2.2 Parser שמפיק דו״ח JSON: parse_nmap_xml.py
nano /root/ai_coder/tools/recon/parse_nmap_xml.py
הדבק:
=========================================================
Parser ל-Nmap XML -> JSON Report
Author: pablo guides
=========================================================
import sys
import json
import xml.etree.ElementTree as ET
from pathlib import Path
def main():
if len(sys.argv) < 2:
print("Usage: python3 parse_nmap_xml.py [out.json]", file=sys.stderr)
sys.exit(1)
xml_path = Path(sys.argv[1])
out_path = Path(sys.argv[2]) if len(sys.argv) >= 3 else xml_path.with_suffix(".report.json")
tree = ET.parse(xml_path)
root = tree.getroot()
report = {
"source": str(xml_path),
"hosts": []
}
for host in root.findall("host"):
status = host.findtext("status[@state]")
addr_el = host.find("address")
addr = addr_el.get("addr") if addr_el is not None else None
host_obj = {
"address": addr,
"status": host.find("status").get("state") if host.find("status") is not None else None,
"hostnames": [],
"ports": []
}
for hn in host.findall("./hostnames/hostname"):
host_obj["hostnames"].append(hn.get("name"))
for port in host.findall("./ports/port"):
portid = port.get("portid")
proto = port.get("protocol")
state_el = port.find("state")
svc_el = port.find("service")
host_obj["ports"].append({
"port": int(portid) if portid and portid.isdigit() else portid,
"proto": proto,
"state": state_el.get("state") if state_el is not None else None,
"service": svc_el.get("name") if svc_el is not None else None,
"product": svc_el.get("product") if svc_el is not None else None,
"version": svc_el.get("version") if svc_el is not None else None,
"extrainfo": svc_el.get("extrainfo") if svc_el is not None else None,
})
report["hosts"].append(host_obj)
out_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"✅ נכתב דו״ח: {out_path}")
if name == "main":
main()
2.3 שימוש מהיר
סריקה:
/root/ai_coder/tools/recon/safe_recon_scan.sh 127.0.0.1
המרה ל־JSON:
python3 /root/ai_coder/tools/recon/parse_nmap_xml.py /root/ai_coder/reports/recon/
3) web_testing: כלי בדיקות headers/session + report JSON
3.1 כלי בדיקה: web_headers_session_audit.py
צור:
mkdir -p /root/ai_coder/tools/web
nano /root/ai_coder/tools/web/web_headers_session_audit.py
הדבק:
=========================================================
Web Headers + Session/Cookie Audit -> JSON Report (דפנסיבי)
Author: pablo guides
=========================================================
import sys
import json
import time
import requests
from urllib.parse import urlparse
SEC_HEADERS = [
"Strict-Transport-Security",
"Content-Security-Policy",
"X-Content-Type-Options",
"X-Frame-Options",
"Referrer-Policy",
"Permissions-Policy",
]
def parse_set_cookie(sc: str):
parts = [p.strip() for p in sc.split(";")]
nameval = parts[0]
attrs = set(p.lower() for p in parts[1:])
return {
"raw": sc,
"name": nameval.split("=", 1)[0].strip(),
"has_secure": any(a == "secure" for a in attrs),
"has_httponly": any(a == "httponly" for a in attrs),
"samesite": next((p.split("=",1)[1].strip() for p in parts[1:] if p.lower().startswith("samesite=")), None),
"path": next((p.split("=",1)[1].strip() for p in parts[1:] if p.lower().startswith("path=")), None),
"domain": next((p.split("=",1)[1].strip() for p in parts[1:] if p.lower().startswith("domain=")), None),
"max_age": next((p.split("=",1)[1].strip() for p in parts[1:] if p.lower().startswith("max-age=")), None),
}
def main():
if len(sys.argv) < 2:
print("Usage: python3 web_headers_session_audit.py https://example.com [out.json]", file=sys.stderr)
sys.exit(1)
url = sys.argv[1].strip()
out = sys.argv[2].strip() if len(sys.argv) >= 3 else "web_audit.report.json"
t0 = time.time()
s = requests.Session()
r = s.get(url, allow_redirects=True, timeout=25)
final_url = r.url
parsed = urlparse(final_url)
headers = {k: v for k, v in r.headers.items()}
missing = [h for h in SEC_HEADERS if h not in headers]
present = {h: headers.get(h) for h in SEC_HEADERS if h in headers}
set_cookies = []
# requests מאחד לפעמים header, ננסה את שניהם
raw_sc = r.headers.get("Set-Cookie")
if raw_sc:
set_cookies.append(parse_set_cookie(raw_sc))
if hasattr(r.raw, "headers"):
sc_list = r.raw.headers.get_all("Set-Cookie") if hasattr(r.raw.headers, "get_all") else None
if sc_list:
set_cookies = [parse_set_cookie(x) for x in sc_list]
cookie_findings = []
for c in set_cookies:
if parsed.scheme == "https":
if not c["has_secure"]:
cookie_findings.append(f"Cookie {c['name']} חסר Secure")
if not c["has_httponly"]:
cookie_findings.append(f"Cookie {c['name']} חסר HttpOnly")
ss = (c["samesite"] or "").lower()
if ss not in ("lax", "strict", "none"):
cookie_findings.append(f"Cookie {c['name']} SameSite לא מוגדר/לא תקין")
if ss == "none" and not c["has_secure"]:
cookie_findings.append(f"Cookie {c['name']} SameSite=None אבל בלי Secure (לא תקין בדפדפנים)")
report = {
"target": url,
"final_url": final_url,
"status_code": r.status_code,
"elapsed_ms": int((time.time() – t0) * 1000),
"security_headers": {
"present": present,
"missing": missing,
},
"cookies": set_cookies,
"findings": {
"cookie_issues": cookie_findings,
},
"recommendations": [
"להוסיף HSTS (ב-HTTPS) במידה ומתאים: Strict-Transport-Security",
"להגדיר CSP בסיסי לפחות ולשפר בהדרגה",
"להוסיף X-Content-Type-Options: nosniff",
"להוסיף Referrer-Policy",
"להוסיף Permissions-Policy לפי הצורך",
"לקוקיז סשן: Secure+HttpOnly+SameSite=Lax/Strict",
]
}
with open(out, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"✅ דו״ח נכתב: {out}")
if name == "main":
main()
התקנה (אם חסר):
pip3 install -U –break-system-packages requests
הרצה:
python3 /root/ai_coder/tools/web/web_headers_session_audit.py https://example.com /root/ai_coder/reports/web_audit.example.json
4) sql_security: סט SQL “IF NOT EXISTS” לאינדקסים + PHP PDO wrapper מלא
4.1 חשוב: MySQL 8.0 לא נותן CREATE INDEX IF NOT EXISTS באופן סטנדרטי
הפתרון המקצועי: Stored Procedure שבודקת information_schema.statistics ומייצרת אינדקס רק אם חסר.
צור קובץ:
mkdir -p /root/ai_coder/sql
nano /root/ai_coder/sql/sql_security_indexes.sql
הדבק:
— =========================================================
— SQL Security + Indexes "IF NOT EXISTS" ל-MySQL 8.0
— Author: pablo guides
— =========================================================
— 1) טבלה ללוגים (אופציונלי)
CREATE TABLE IF NOT EXISTS security_audit_log (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
action VARCHAR(64) NOT NULL,
details JSON NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
DELIMITER $$
— 2) פרוצדורה: יצירת אינדקס רק אם לא קיים
CREATE PROCEDURE IF NOT EXISTS ensure_index(
IN p_schema VARCHAR(64),
IN p_table VARCHAR(64),
IN p_index VARCHAR(64),
IN p_sql TEXT
)
BEGIN
DECLARE idx_count INT DEFAULT 0;
SELECT COUNT(*)
INTO idx_count
FROM information_schema.statistics
WHERE table_schema = p_schema
AND table_name = p_table
AND index_name = p_index;
IF idx_count = 0 THEN
SET @q = p_sql;
PREPARE stmt FROM @q;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
INSERT INTO security_audit_log(action, details)
VALUES ('create_index', JSON_OBJECT('schema',p_schema,'table',p_table,'index',p_index,'sql',p_sql));
END IF;
END$$
DELIMITER ;
— 3) דוגמאות: התאם שמות טבלאות/עמודות אצלך
— שימוש:
— CALL ensure_index(DATABASE(), 'ai_queries', 'idx_aiq_created', 'CREATE INDEX idx_aiq_created ON ai_queries(created_at)');
— CALL ensure_index(DATABASE(), 'ai_queries', 'idx_aiq_model', 'CREATE INDEX idx_aiq_model ON ai_queries(model_name)');
הרצה:
mysql -uYOUR_USER -pYOUR_PASS YOUR_DB < /root/ai_coder/sql/sql_security_indexes.sql 4.2 PHP PDO Wrapper מלא (PHP 8.3) — /home/raviti/htdocs/raviti.net/code/php/Db.php צור תיקייה: mkdir -p /home/raviti/htdocs/raviti.net/code/php nano /home/raviti/htdocs/raviti.net/code/php/Db.php הדבק: [dm_code_snippet background="yes" background-mobile="yes" slim="yes" line-numbers="no" bg-color="#abb8c3" theme="dark" language="php" wrapped="yes" height="" copy-text="Copy Code" copy-confirmed="Copied"] pdo = new \PDO($dsn, $user, $pass, [
\PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
\PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
\PDO::ATTR_EMULATE_PREPARES => false,
]);
// קשיח: utf8mb4 + collation מומלץ
$this->pdo->exec("SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci");
}
public function pdo(): \PDO
{
return $this->pdo;
}
/** SELECT שמחזיר מערך שורות */
public function all(string $sql, array $params = []): array
{
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
return $stmt->fetchAll();
}
/** SELECT שמחזיר שורה אחת או null */
public function one(string $sql, array $params = []): ?array
{
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
$row = $stmt->fetch();
return $row === false ? null : $row;
}
/** INSERT/UPDATE/DELETE */
public function exec(string $sql, array $params = []): int
{
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
return $stmt->rowCount();
}
/** Insert והחזרת lastInsertId */
public function insert(string $sql, array $params = []): string
{
$this->exec($sql, $params);
return $this->pdo->lastInsertId();
}
/** טרנזקציה בטוחה */
public function transaction(callable $fn)
{
$this->pdo->beginTransaction();
try {
$result = $fn($this);
$this->pdo->commit();
return $result;
} catch (\Throwable $e) {
$this->pdo->rollBack();
throw $e;
}
}
/** בניית LIMIT/OFFSET בצורה בטוחה */
public static function limitOffset(int $limit, int $offset): string
{
$limit = max(1, min($limit, 500));
$offset = max(0, $offset);
return " LIMIT {$limit} OFFSET {$offset} ";
}
}
[/dm_code_snippet]
### דוגמה לשימוש (קובץ חדש): `db_test.php`
"`bash
nano /home/raviti/htdocs/raviti.net/code/php/db_test.php
"`
one("SELECT NOW() AS now_time");
echo "DB OK: " . ($row['now_time'] ?? 'no') . PHP_EOL;
הרצה:
"`bash
php -d detect_unicode=0 /home/raviti/htdocs/raviti.net/code/php/db_test.php
"`
—
# 5) איך לחבר את הכל למודל תשובות קבוע (ב־RAG)
עכשיו כשהכלים קיימים, אתה “מאכיל” אותם ל־RAG בשתי דרכים:
1) **ingest_repo.py** על הקוד שלך (כולל tools)
2) **ingest_html_pages.py** על דוקומנטציה (Kali/OWASP)
דוגמה:
"`bash
cd /root/ai_coder
python3 ingest_repo.py
python3 ingest_html_pages.py
"`
ואז שאלות כמו:
– “תן סקריפט recon בטוח + דו״ח JSON”
– “בדוק headers וסשן והחזר דו״ח”
– “תן אינדקסים מומלצים ושפר שאילתות ב־MySQL”
המודל כבר יחזיר קבצים מלאים.
—
# 6) ריסטארט לשירות API
אחרי עדכוני rag_api:
screen -S ragapi -X quit
screen -S ragapi -dm bash -lc 'cd /root/ai_coder && uvicorn rag_api:app –host 127.0.0.1 –port 8000'
—
## אם תרצה, השלב הבא הכי חזק
אני יכול להוסיף לך “מנתב” בתוך `rag_api.py` שכאשר `category=recon` הוא **אוטומטית** מחזיר:
– `files`: safe_recon_scan.sh + parse_nmap_xml.py
– `tests`: פקודות הרצה
– ואז המודל רק “ממלא” ערכים (TARGET/OUTDIR) ומסביר
אותו דבר ל־web/sql.
תגיד אם אתה רוצה את זה “אוטומטי תמיד” (Router קשיח) או “חצי אוטומטי” (רק כשמבקשים).
::contentReference[oaicite:0]{index=0}
