Files
WebSword/websword.py
2026-03-06 11:34:24 +00:00

1457 lines
63 KiB
Python

"""
title: WebSword
description: Universal website ingestion & navigation manifest for LLMs (OWUI internal).
author: Jan + ChatGPT + Claude
version: 0.3-dev
license: MIT
requirements: requests, pydantic, trafilatura, lxml, playwright, playwright-stealth
Changelog:
0.1.0 - initial release
0.1.1 - comprehensive debug logging, NoneType guards
0.2-dev - Markdown media rendering, Excalibur BFS-crawler,
stealth + intercept fetch strategies (stubs)
0.3-dev - MAJOR REFACTOR: unified mode-based API
ws_stealth_scrape / ws_intercept_scrape entfernt
ws_ingest(mode=) ist zentraler Einstiegspunkt:
mode: auto | static | rendered | stealth | intercept | cdp
excalibur(mode=) nutzt denselben mode-Parameter
_acquire() ist zentraler Dispatcher fuer alle Fetch-Strategien
CDP/Accessibility-Tree als Stub vorbereitet (mode='cdp')
Fetch-Strategien (alle ueber mode=):
static - normaler HTTP-Fetch, trafilatura-Extraktion
rendered - Playwright ohne stealth (Stub)
stealth - Playwright + playwright-stealth (Stub)
intercept - Playwright + XHR/JSON-Sniffer (Stub)
cdp - Playwright + Chrome DevTools Protocol / Accessibility Tree (Stub)
auto - statisch; bei zu wenig Text -> stealth als Fallback (wenn aktiv)
Public API:
ws_ingest(url, mode='auto') - Seite fetchen & speichern
ws_map(site_id) - Struktur anzeigen
ws_get(site_id, chunk_id) - Chunk lesen
ws_search(site_id, query) - Volltextsuche
ws_list_media(site_id) - Media auflisten
ws_embed(site_id, media_id) - Media einbetten
excalibur(url, mode='auto') - Deep-crawl einer ganzen Site
ws_dump(site_id) - Debug: vollstaendiges Manifest
"""
import re
import time
import html
import hashlib
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple
import requests
from pydantic import BaseModel, Field
# ─────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger("WebSword")
VALID_MODES = ("auto", "static", "rendered", "stealth", "intercept", "cdp")
def _log_entry(fn: str, **kwargs):
log.debug("[WebSword] ▶ %s | %s", fn, kwargs)
def _log_exit(fn: str, summary: str = ""):
log.debug("[WebSword] ◀ %s | %s", fn, summary)
def _log_step(fn: str, step: str, **kwargs):
log.debug("[WebSword] ⟳ %s | %s | %s", fn, step, kwargs)
# ─────────────────────────────────────────────
# EventEmitter
# ─────────────────────────────────────────────
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def status(self, description: str, done: bool = False):
if self.event_emitter:
await self.event_emitter(
{"type": "status", "data": {"description": description, "done": done}}
)
async def error(self, description: str):
if self.event_emitter:
await self.event_emitter(
{"type": "status", "data": {"description": f"{description}", "done": True}}
)
# ─────────────────────────────────────────────
# Module-level helpers
# ─────────────────────────────────────────────
def _now_unix() -> int:
return int(time.time())
def _sha256(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _truncate_bytes(s: str, max_bytes: int) -> str:
b = s.encode("utf-8", errors="ignore")
return s if len(b) <= max_bytes else b[:max_bytes].decode("utf-8", errors="ignore")
def _basic_visible_text_estimate(html_text: str) -> int:
t = re.sub(r"(?is)<(script|style|noscript)\b.*?>.*?</\1>", " ", html_text)
t = re.sub(r"(?s)<[^>]+>", " ", t)
t = html.unescape(t)
return len(re.sub(r"\s+", " ", t).strip())
def _looks_like_needs_render(html_text: str, min_len: int) -> bool:
text_len = _basic_visible_text_estimate(html_text)
needs = text_len < min_len
log.debug("[WebSword] _looks_like_needs_render | visible=%d min=%d -> %s", text_len, min_len, needs)
return needs
def _extract_title_basic(html_text: str) -> str:
m = re.search(r"(?is)<title[^>]*>(.*?)</title>", html_text)
if not m:
return ""
return re.sub(r"\s+", " ", html.unescape(m.group(1))).strip()[:200]
def _safe_domain(url: str) -> str:
m = re.match(r"^https?://([^/]+)/?", (url or "").strip())
return m.group(1).lower() if m else ""
def _normalize_url(u: str) -> str:
return u.split("#")[0].rstrip("/")
def _resolve_url(href: str, current: str) -> str:
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("//"):
scheme = current.split("://")[0] if "://" in current else "https"
return f"{scheme}:{href}"
if href.startswith("/"):
m = re.match(r"^(https?://[^/]+)", current)
return f"{m.group(1)}{href}" if m else href
base = current.rsplit("/", 1)[0]
return f"{base}/{href}"
# ─────────────────────────────────────────────
# In-memory store
# ─────────────────────────────────────────────
_WEBSWORD_STORE: Dict[str, Dict[str, Any]] = {}
# ─────────────────────────────────────────────
# Empty fetch_meta factory
# ─────────────────────────────────────────────
def _empty_fetch_meta(url: str, mode: str) -> Dict[str, Any]:
return {
"mode": mode,
"url": url,
"final_url": url,
"status_code": 0,
"content_type": "",
"headers": {},
"html": "",
"intercepted_json": [],
"accessibility_tree": None,
"screenshot_b64": None,
"fetched_at_unix": _now_unix(),
}
# ─────────────────────────────────────────────
# Tools class
# ─────────────────────────────────────────────
class Tools:
class Valves(BaseModel):
# HTTP
USER_AGENT: str = Field(
default="WebSword/0.3 (+open-webui)",
description="User-Agent fuer HTTP-Requests.",
)
TIMEOUT_S: int = Field(default=15, description="HTTP-Timeout in Sekunden.")
MAX_HTML_BYTES: int = Field(
default=2_000_000, description="Max. Bytes vom gefetchten HTML."
)
# Feature flags
ENABLE_INGEST: bool = Field(default=True, description="ws_ingest erlauben.")
ENABLE_MAP: bool = Field(default=True, description="ws_map erlauben.")
ENABLE_GET: bool = Field(default=True, description="ws_get erlauben.")
ENABLE_SEARCH: bool = Field(default=True, description="ws_search erlauben.")
ENABLE_MEDIA: bool = Field(default=True, description="ws_list_media / ws_embed erlauben.")
ENABLE_EXCALIBUR: bool = Field(
default=False,
description="Excalibur Deep-crawl. DEAKTIVIERT by default — kann viele Requests ausloesen.",
)
# Playwright-Modi (alle default=False)
ENABLE_RENDERED_FETCH: bool = Field(
default=False,
description="mode='rendered': Playwright ohne stealth. Benoetigt: playwright install chromium.",
)
ENABLE_STEALTH_SCRAPE: bool = Field(
default=False,
description="mode='stealth': Playwright + playwright-stealth. Benoetigt: playwright install chromium.",
)
ENABLE_INTERCEPT_FETCH: bool = Field(
default=False,
description=(
"mode='intercept': Playwright XHR/JSON-Sniffer. "
"Plattformunabhaengig fuer SPAs (TikTok, Instagram, Reddit ...). "
"Benoetigt: playwright install chromium."
),
)
ENABLE_CDP_FETCH: bool = Field(
default=False,
description=(
"mode='cdp': Playwright + Chrome DevTools Protocol (Accessibility Tree). "
"Macht Sites ohne Hyperlinks navigierbar. "
"Benoetigt: playwright install chromium."
),
)
ENABLE_STEALTH_SCREENSHOT: bool = Field(
default=False,
description="Bei stealth/cdp/intercept: Screenshot als base64-PNG anhaengen.",
)
# Playwright-Timeouts (gemeinsam fuer alle Modi)
PLAYWRIGHT_TIMEOUT_MS: int = Field(
default=20000,
description="Playwright page-load Timeout in ms (gilt fuer alle Playwright-Modi).",
)
PLAYWRIGHT_WAIT_UNTIL: str = Field(
default="networkidle",
description="Playwright wait_until: networkidle | domcontentloaded | load | commit.",
)
# Intercept-spezifisch
INTERCEPT_MAX_RESPONSES: int = Field(
default=20, description="Max. JSON-Responses die der Interceptor sammelt.",
)
INTERCEPT_MIN_JSON_BYTES: int = Field(
default=64, description="Min. Bytes einer JSON-Response (filtert triviale).",
)
# Auto-mode
AUTO_RENDER_MIN_TEXT_LEN: int = Field(
default=500,
description="auto-mode: Wenn sichtbarer Text kuerzer, wird stealth versucht (falls aktiv).",
)
# Extraktion
TRAFILATURA_FAVOR: bool = Field(
default=True, description="trafilatura bevorzugen wenn Qualitaet ausreichend.",
)
TRAFI_MIN_TEXT_LEN: int = Field(
default=800, description="Min. Textlaenge damit trafilatura als erfolgreich gilt.",
)
MAX_CHUNK_CHARS: int = Field(default=8000, description="Max. Zeichen pro Chunk.")
CHUNK_TARGET_CHARS: int = Field(default=2200, description="Ziel-Chunk-Groesse.")
CHUNK_OVERLAP_CHARS: int = Field(default=120, description="Ueberlappung zwischen Chunks.")
MAX_CHUNKS: int = Field(default=12, description="Max. Chunks pro Seite.")
MAX_MEDIA_ITEMS: int = Field(default=40, description="Max. Media-Items.")
MAX_LINK_ITEMS: int = Field(default=60, description="Max. Links.")
# Embed
EMBED_ALLOWLIST: List[str] = Field(
default_factory=lambda: [
"youtube.com", "www.youtube.com", "youtu.be",
"player.vimeo.com", "vimeo.com",
"soundcloud.com", "w.soundcloud.com",
"open.spotify.com",
],
description="Erlaubte iframe-Domains.",
)
# Excalibur
EXCALIBUR_MAX_DEPTH: int = Field(default=2, description="Excalibur: Max. Crawl-Tiefe.")
EXCALIBUR_MAX_PAGES: int = Field(default=20, description="Excalibur: Max. Seiten.")
EXCALIBUR_DELAY_S: float = Field(default=0.5, description="Excalibur: Pause zwischen Requests in Sekunden.")
EXCALIBUR_CHUNK_PREVIEW_CHARS: int = Field(
default=500, description="Excalibur: Max. Zeichen pro Chunk in der komprimierten Ausgabe.",
)
EXCALIBUR_TOKEN_WARN_THRESHOLD: int = Field(
default=50000, description="Excalibur: Warnung wenn geschaetzte Token diesen Wert uebersteigen.",
)
def __init__(self):
self.valves = self.Valves()
# ─────────────────────────────────────────
# Layer A: Fetch-Strategien (intern)
# ─────────────────────────────────────────
def _fetch_static(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
_log_entry("_fetch_static", url=url)
issues = []
headers = {
"User-Agent": self.valves.USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
try:
r = requests.get(url, headers=headers, timeout=self.valves.TIMEOUT_S, allow_redirects=True)
ct = r.headers.get("Content-Type", "")
raw = r.text if r.encoding else r.content.decode("utf-8", errors="ignore")
raw = _truncate_bytes(raw, self.valves.MAX_HTML_BYTES)
_log_step("_fetch_static", "response", status=r.status_code, ct=ct, html_len=len(raw))
if "text/html" not in ct and "application/xhtml" not in ct:
issues.append({"type": "non_html_content_type", "detail": ct})
meta = _empty_fetch_meta(url, "static")
meta.update({
"final_url": str(r.url),
"status_code": int(r.status_code),
"content_type": ct,
"headers": dict(r.headers),
"html": raw,
})
_log_exit("_fetch_static", f"status={r.status_code} len={len(raw)}")
return meta, issues
except Exception as e:
log.exception("[WebSword] _fetch_static | EXCEPTION: %s", e)
issues.append({"type": "fetch_static_failed", "detail": repr(e)})
return _empty_fetch_meta(url, "static"), issues
def _fetch_rendered(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright ohne stealth — Stub v0.3.
TODO(v0.3):
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
await browser.close()
"""
_log_entry("_fetch_rendered", url=url)
issues = []
if not self.valves.ENABLE_RENDERED_FETCH:
issues.append({"type": "rendered_disabled", "detail": "ENABLE_RENDERED_FETCH=False"})
return _empty_fetch_meta(url, "rendered"), issues
issues.append({"type": "rendered_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "rendered"), issues
def _fetch_stealth(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright + playwright-stealth — Stub v0.3.
Umgeht Standard-Headless-Detection, Canvas-Fingerprinting etc.
Grenzen: Cloudflare Turnstile, hCaptcha, harte Login-Walls.
TODO(v0.3):
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
ctx = await browser.new_context(
user_agent=self.valves.USER_AGENT,
viewport={"width": 1280, "height": 800},
locale="de-DE",
)
page = await ctx.new_page()
await stealth_async(page)
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
screenshot_b64 = None
if self.valves.ENABLE_STEALTH_SCREENSHOT:
import base64
screenshot_b64 = base64.b64encode(
await page.screenshot(full_page=True)).decode()
await browser.close()
meta = _empty_fetch_meta(url, "stealth")
meta.update({"html": raw_html, "status_code": resp.status,
"screenshot_b64": screenshot_b64, "final_url": page.url})
"""
_log_entry("_fetch_stealth", url=url)
issues = []
if not self.valves.ENABLE_STEALTH_SCRAPE:
issues.append({"type": "stealth_disabled", "detail": "ENABLE_STEALTH_SCRAPE=False"})
return _empty_fetch_meta(url, "stealth"), issues
issues.append({"type": "stealth_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "stealth"), issues
async def _fetch_intercept(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright + XHR/JSON-Sniffer — Stub v0.3.
Lauscht passiv auf alle JSON-API-Responses waehrend die Seite laedt.
Plattformunabhaengig: TikTok, Instagram, Twitter/X, Reddit, Amazon, ...
TODO(v0.3):
intercepted = []
async def handle_response(response):
ct = response.headers.get("content-type", "")
if "application/json" not in ct:
return
try:
body = await response.body()
if len(body) >= self.valves.INTERCEPT_MIN_JSON_BYTES:
intercepted.append({
"url": response.url,
"status": response.status,
"body": body.decode("utf-8", errors="ignore"),
})
except Exception as e:
log.debug("[WebSword] intercept response error: %s", e)
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
await stealth_async(page)
page.on("response", handle_response)
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
await browser.close()
meta = _empty_fetch_meta(url, "intercept")
meta.update({"html": raw_html, "status_code": resp.status if resp else 0,
"intercepted_json": intercepted[:self.valves.INTERCEPT_MAX_RESPONSES],
"final_url": page.url})
"""
_log_entry("_fetch_intercept", url=url)
issues = []
if not self.valves.ENABLE_INTERCEPT_FETCH:
issues.append({"type": "intercept_disabled", "detail": "ENABLE_INTERCEPT_FETCH=False"})
return _empty_fetch_meta(url, "intercept"), issues
issues.append({"type": "intercept_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "intercept"), issues
def _fetch_cdp(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright + Chrome DevTools Protocol — Stub v0.3.
Macht Sites ohne Hyperlinks navigierbar:
- Accessibility Tree: semantische Struktur unabhaengig vom visuellen DOM
- Alle interaktiven Elemente (Buttons, ARIA-Roles, onClick-Handler)
- Shadow DOM durchdringen
- Kein klassisches Hyperlink-System noetig
TODO(v0.3):
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
await stealth_async(page)
client = await page.context.new_cdp_session(page)
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
# Accessibility Tree
ax_tree = await client.send("Accessibility.getFullAXTree")
# Alle interaktiven Elemente inventarisieren
clickables = await page.query_selector_all(
"button, [role='button'], [role='link'], [role='menuitem'], "
"[tabindex], [onclick], a, input[type='submit']"
)
interactive = []
for el in clickables:
try:
label = await el.get_attribute("aria-label") or await el.inner_text()
box = await el.bounding_box()
interactive.append({
"label": (label or "").strip()[:80],
"visible": box is not None,
"tag": await el.evaluate("e => e.tagName"),
})
except Exception:
pass
screenshot_b64 = None
if self.valves.ENABLE_STEALTH_SCREENSHOT:
import base64
screenshot_b64 = base64.b64encode(
await page.screenshot(full_page=True)).decode()
await browser.close()
meta = _empty_fetch_meta(url, "cdp")
meta.update({
"html": raw_html,
"status_code": resp.status if resp else 0,
"final_url": page.url,
"accessibility_tree": ax_tree,
"interactive_elements": interactive,
"screenshot_b64": screenshot_b64,
})
"""
_log_entry("_fetch_cdp", url=url)
issues = []
if not self.valves.ENABLE_CDP_FETCH:
issues.append({"type": "cdp_disabled", "detail": "ENABLE_CDP_FETCH=False"})
return _empty_fetch_meta(url, "cdp"), issues
issues.append({"type": "cdp_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "cdp"), issues
# ─────────────────────────────────────────
# Layer B: _acquire() — zentraler Dispatcher
# ─────────────────────────────────────────
def _acquire(self, url: str, mode: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Zentraler Fetch-Dispatcher fuer alle Strategien.
mode: auto | static | rendered | stealth | intercept | cdp
Fallback-Ketten:
auto -> static; bei zu wenig Text -> stealth (wenn aktiv) -> static
rendered -> static (wenn rendered leer)
stealth -> static (wenn stealth leer)
intercept -> stealth (wenn aktiv) -> static
cdp -> stealth (wenn aktiv) -> static
"""
_log_entry("_acquire", url=url, mode=mode)
mode = (mode or "auto").lower().strip()
if mode not in VALID_MODES:
log.warning("[WebSword] _acquire | unknown mode '%s', using auto", mode)
mode = "auto"
def _fallback_static(reason: str, prior_issues: list) -> Tuple[Dict, List]:
log.warning("[WebSword] _acquire | fallback to static: %s", reason)
meta, issues = self._fetch_static(url)
issues = prior_issues + issues
issues.append({"type": "acquire_fallback_static", "detail": reason})
return meta, issues
def _fallback_stealth_then_static(reason: str, prior_issues: list) -> Tuple[Dict, List]:
if self.valves.ENABLE_STEALTH_SCRAPE:
log.warning("[WebSword] _acquire | fallback to stealth: %s", reason)
meta, issues = self._fetch_stealth(url)
issues = prior_issues + issues
issues.append({"type": "acquire_fallback_stealth", "detail": reason})
if (meta.get("html") or "").strip():
return meta, issues
return _fallback_static(reason + "_then_static", prior_issues)
# static
if mode == "static":
meta, issues = self._fetch_static(url)
issues.append({"type": "acquire_mode", "detail": "static"})
_log_exit("_acquire", "static")
return meta, issues
# rendered
if mode == "rendered":
meta, issues = self._fetch_rendered(url)
if not (meta.get("html") or "").strip():
return _fallback_static("rendered_empty", issues)
issues.append({"type": "acquire_mode", "detail": "rendered"})
_log_exit("_acquire", "rendered")
return meta, issues
# stealth
if mode == "stealth":
meta, issues = self._fetch_stealth(url)
if not (meta.get("html") or "").strip():
return _fallback_static("stealth_empty", issues)
issues.append({"type": "acquire_mode", "detail": "stealth"})
_log_exit("_acquire", "stealth")
return meta, issues
# intercept
if mode == "intercept":
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, self._fetch_intercept(url))
meta, issues = future.result(
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS / 1000 + 5)
else:
meta, issues = loop.run_until_complete(self._fetch_intercept(url))
except Exception as e:
log.error("[WebSword] _acquire | intercept call failed: %s", e)
meta = _empty_fetch_meta(url, "intercept")
issues = [{"type": "intercept_call_failed", "detail": repr(e)}]
if not (meta.get("html") or "").strip() and not meta.get("intercepted_json"):
return _fallback_stealth_then_static("intercept_empty", issues)
issues.append({"type": "acquire_mode", "detail": "intercept"})
_log_exit("_acquire", "intercept")
return meta, issues
# cdp
if mode == "cdp":
meta, issues = self._fetch_cdp(url)
if not (meta.get("html") or "").strip():
return _fallback_stealth_then_static("cdp_empty", issues)
issues.append({"type": "acquire_mode", "detail": "cdp"})
_log_exit("_acquire", "cdp")
return meta, issues
# auto
meta, issues = self._fetch_static(url)
_log_step("_acquire", "auto static done",
status=meta.get("status_code"), html_len=len(meta.get("html") or ""))
if meta.get("status_code") in (401, 403):
issues.append({"type": "access_restricted", "detail": f"status={meta['status_code']}"})
html_text = meta.get("html") or ""
if _looks_like_needs_render(html_text, self.valves.AUTO_RENDER_MIN_TEXT_LEN):
if self.valves.ENABLE_STEALTH_SCRAPE:
log.debug("[WebSword] _acquire | auto: static insufficient, trying stealth")
s_meta, s_issues = self._fetch_stealth(url)
if (s_meta.get("html") or "").strip():
s_issues.extend(issues)
s_issues.append({"type": "auto_used_stealth", "detail": "static_insufficient"})
_log_exit("_acquire", "auto->stealth")
return s_meta, s_issues
issues.append({"type": "auto_used_static", "detail": "stealth_inactive_or_failed"})
else:
issues.append({"type": "auto_used_static", "detail": "sufficient_text"})
_log_exit("_acquire", "auto->static")
return meta, issues
# ─────────────────────────────────────────
# Layer C: Extraktion
# ─────────────────────────────────────────
def _extract_links_basic(self, html_text: str) -> List[Dict[str, str]]:
out: List[Dict[str, str]] = []
for m in re.finditer(r'(?is)<a\s+[^>]*href=["\']([^"\']+)["\']', html_text or ""):
href = (m.group(1) or "").strip()
if not href or href.startswith("#") or href.lower().startswith("javascript:"):
continue
out.append({"href": href})
if len(out) >= self.valves.MAX_LINK_ITEMS:
break
return out
def _extract_media_basic(self, html_text: str) -> List[Dict[str, Any]]:
media: List[Dict[str, Any]] = []
for m in re.finditer(
r'(?is)<img\s+[^>]*src=["\']([^"\']+)["\']([^>]*)>', html_text or ""
):
src = (m.group(1) or "").strip()
rest = m.group(2) or ""
alt_m = re.search(r'(?is)\balt=["\']([^"\']*)["\']', rest)
alt = (alt_m.group(1) if alt_m else "").strip()
media.append({"media_id": None, "type": "image", "src": src, "alt": alt})
if len(media) >= self.valves.MAX_MEDIA_ITEMS:
break
if len(media) < self.valves.MAX_MEDIA_ITEMS:
for m in re.finditer(
r'(?is)<iframe\s+[^>]*src=["\']([^"\']+)["\']', html_text or ""
):
src = (m.group(1) or "").strip()
media.append({
"media_id": None, "type": "embed",
"src": src, "provider": _safe_domain(src),
})
if len(media) >= self.valves.MAX_MEDIA_ITEMS:
break
return media
def _make_chunk_id(self, prefix: str, text: str, idx: int) -> str:
h = _sha256(f"{prefix}|{idx}|{text[:120]}")[:10]
return f"c{idx}_{h}"
def _chunk_text(self, text: str) -> List[str]:
_log_entry("_chunk_text", text_len=len(text or ""))
t = (text or "").strip()
if not t:
return []
paras = [p.strip() for p in re.split(r"\n{2,}", t) if p.strip()]
if not paras:
return []
chunks: List[str] = []
buf: List[str] = []
buf_len = 0
target = int(self.valves.CHUNK_TARGET_CHARS)
overlap = int(self.valves.CHUNK_OVERLAP_CHARS)
def flush():
nonlocal buf, buf_len
if not buf:
return
chunk = "\n\n".join(buf).strip()
if chunk:
chunks.append(chunk)
buf, buf_len = [], 0
for p in paras:
plen = len(p)
if buf_len + plen + 2 <= target or not buf:
buf.append(p)
buf_len += plen + 2
else:
flush()
buf.append(p)
buf_len = plen + 2
if len(chunks) >= int(self.valves.MAX_CHUNKS):
break
flush()
if overlap > 0 and len(chunks) > 1:
out = [chunks[0]]
for i in range(1, len(chunks)):
tail = out[-1][-overlap:]
out.append((tail + "\n\n" + chunks[i]).strip())
chunks = out
result = chunks[:int(self.valves.MAX_CHUNKS)]
_log_exit("_chunk_text", f"chunks={len(result)}")
return result
def _extract_headings_best_effort(self, html_text: str) -> List[str]:
if not html_text:
return []
m = re.search(r"(?is)<(main|article)\b[^>]*>(.*?)</\1>", html_text)
scope = m.group(2) if m else html_text
headings: List[str] = []
for m in re.finditer(r"(?is)<h([1-3])\b[^>]*>(.*?)</h\1>", scope):
raw = re.sub(r"(?s)<[^>]+>", " ", m.group(2))
txt = re.sub(r"\s+", " ", html.unescape(raw)).strip()
if txt and len(txt) >= 3:
headings.append(txt[:160])
if len(headings) >= 24:
break
seen = set()
out = []
for h in headings:
key = h.lower()
if key not in seen:
seen.add(key)
out.append(h)
return out
def _extractor_basic_dom(self, fetch_meta: Dict, issues: List) -> Dict:
_log_entry("_extractor_basic_dom")
if not fetch_meta:
log.error("[WebSword] _extractor_basic_dom | fetch_meta empty")
return {
"title": "", "toc": [], "chunks": [], "links": [], "media": [],
"issues": list(issues) + [{"type": "basic_dom_no_fetch_meta", "detail": "empty"}],
}
html_text = fetch_meta.get("html") or ""
title = _extract_title_basic(html_text) or fetch_meta.get("final_url", fetch_meta.get("url", ""))
cleaned = re.sub(r"(?is)<(script|style|noscript)\b.*?>.*?</\1>", " ", html_text)
txt = re.sub(r"\s+", " ", html.unescape(re.sub(r"(?s)<[^>]+>", " ", cleaned))).strip()
chunk_text = txt[:self.valves.MAX_CHUNK_CHARS]
_log_exit("_extractor_basic_dom", f"title={title[:40]} text_len={len(txt)}")
return {
"title": title,
"toc": [],
"chunks": [{"chunk_id": "c1", "heading": title[:120],
"text": chunk_text, "selectors": [], "links": []}],
"links": self._extract_links_basic(html_text),
"media": self._extract_media_basic(html_text),
"issues": list(issues),
}
def _extractor_readability(self, fetch_meta: Dict, issues: List) -> Dict:
_log_entry("_extractor_readability")
if not fetch_meta:
log.error("[WebSword] _extractor_readability | fetch_meta empty")
return {"issues": list(issues) + [{"type": "readability_no_fetch_meta", "detail": "empty"}]}
html_text = fetch_meta.get("html") or ""
final_url = fetch_meta.get("final_url") or fetch_meta.get("url") or ""
try:
import trafilatura
from trafilatura.metadata import extract_metadata
except Exception as e:
log.error("[WebSword] _extractor_readability | trafilatura missing: %s", e)
return {"issues": list(issues) + [{"type": "trafilatura_missing", "detail": repr(e)}]}
title, lang = "", ""
try:
md = extract_metadata(html_text, url=final_url)
if md:
title = (md.title or "").strip()
lang = (md.language or "").strip()
except Exception as e:
issues = list(issues) + [{"type": "trafilatura_metadata_failed", "detail": repr(e)}]
try:
extracted = (trafilatura.extract(
html_text, url=final_url,
include_comments=False, include_tables=True,
include_links=False, favor_precision=True,
output_format="txt",
) or "").strip()
except Exception as e:
log.error("[WebSword] _extractor_readability | extract failed: %s", e)
return {"issues": list(issues) + [{"type": "trafilatura_extract_failed", "detail": repr(e)}]}
_log_step("_extractor_readability", "extracted", extracted_len=len(extracted))
if len(extracted) < int(self.valves.TRAFI_MIN_TEXT_LEN):
return {
"title": title or _extract_title_basic(html_text) or final_url,
"lang": lang, "toc": [],
"chunks": ([{"chunk_id": "c1", "heading": (title or final_url)[:120],
"text": extracted[:self.valves.MAX_CHUNK_CHARS],
"selectors": [], "links": []}] if extracted else []),
"links": self._extract_links_basic(html_text),
"media": self._extract_media_basic(html_text),
"issues": list(issues) + [{"type": "trafilatura_low_text",
"detail": f"len={len(extracted)}<min={self.valves.TRAFI_MIN_TEXT_LEN}"}],
}
text_chunks = self._chunk_text(extracted)
if not text_chunks:
return {"issues": list(issues) + [{"type": "trafilatura_empty_after_chunk", "detail": "no_chunks"}]}
headings = self._extract_headings_best_effort(html_text)
toc = []
for i, htxt in enumerate(headings, 1):
mapped = None
for ci, ct in enumerate(text_chunks, 1):
if htxt.lower() in ct.lower():
mapped = ci
break
toc.append({"id": f"h{i}", "text": htxt, "chunk_ref": mapped})
base_heading = (title or _extract_title_basic(html_text) or final_url)[:120]
chunks = []
for idx, ctext in enumerate(text_chunks, 1):
cid = self._make_chunk_id("trafi", base_heading, idx)
chunks.append({
"chunk_id": cid,
"heading": base_heading if idx == 1 else f"{base_heading} (part {idx})",
"text": ctext[:self.valves.MAX_CHUNK_CHARS],
"selectors": [], "links": [],
})
for t in toc:
ci = t.pop("chunk_ref", None)
if isinstance(ci, int) and 1 <= ci <= len(chunks):
t["chunk_id"] = chunks[ci - 1]["chunk_id"]
_log_exit("_extractor_readability", f"chunks={len(chunks)} toc={len(toc)}")
return {
"title": title or _extract_title_basic(html_text) or final_url,
"lang": lang, "toc": toc, "chunks": chunks,
"links": self._extract_links_basic(html_text),
"media": self._extract_media_basic(html_text),
"issues": list(issues) + [{"type": "trafilatura_ok",
"detail": f"extracted_len={len(extracted)} chunks={len(chunks)}"}],
}
# ─────────────────────────────────────────
# Layer D: Sieve
# ─────────────────────────────────────────
def _sieve(self, fetch_meta: Dict, issues: List) -> Dict:
_log_entry("_sieve",
html_len=len((fetch_meta or {}).get("html") or ""),
intercepted=len((fetch_meta or {}).get("intercepted_json") or []),
has_ax_tree=bool((fetch_meta or {}).get("accessibility_tree")))
if not fetch_meta:
log.error("[WebSword] _sieve | fetch_meta empty")
return {"title": "", "toc": [], "chunks": [], "links": [], "media": [],
"issues": list(issues) + [{"type": "sieve_no_fetch_meta", "detail": "empty"}]}
# Intercept-JSON als Pseudo-HTML injizieren
intercepted = (fetch_meta or {}).get("intercepted_json") or []
if intercepted and isinstance(fetch_meta, dict):
import json as _json
json_blob = "\n\n".join(
_json.dumps(item.get("body") or "", ensure_ascii=False)[:1200]
for item in intercepted[:10] if item.get("body")
)
if json_blob:
fetch_meta = dict(fetch_meta)
fetch_meta["html"] = (fetch_meta.get("html") or "") + \
"\n<!-- INTERCEPTED_JSON -->\n<pre>" + json_blob + "</pre>"
log.debug("[WebSword] _sieve | injected %d JSON blocks", len(intercepted))
# Accessibility Tree injizieren
ax_tree = (fetch_meta or {}).get("accessibility_tree")
if ax_tree and isinstance(fetch_meta, dict):
import json as _json
ax_text = _json.dumps(ax_tree, ensure_ascii=False)[:3000]
fetch_meta = dict(fetch_meta)
fetch_meta["html"] = (fetch_meta.get("html") or "") + \
"\n<!-- ACCESSIBILITY_TREE -->\n<pre>" + ax_text + "</pre>"
log.debug("[WebSword] _sieve | injected accessibility tree")
# Kandidaten bewerten
basic = self._extractor_basic_dom(fetch_meta, issues)
basic_text = (basic.get("chunks") or [{}])[0].get("text") or "" if basic.get("chunks") else ""
basic_score = min(1.0, len(basic_text) / 2000.0)
rd = self._extractor_readability(fetch_meta, issues)
rd_chunks = rd.get("chunks") or [] if isinstance(rd, dict) else []
rd_text_len = sum(len((c or {}).get("text") or "") for c in rd_chunks)
rd_score = min(1.2, rd_text_len / 3500.0) if rd_text_len else 0.0
if isinstance(rd, dict) and rd.get("toc"):
rd_score += 0.15
if self.valves.TRAFILATURA_FAVOR:
rd_score += 0.10
name, score, chosen = max(
[("basic_dom", basic_score, basic), ("trafilatura", rd_score, rd)],
key=lambda x: x[1]
)
_log_step("_sieve", "winner", name=name, score=round(score, 3))
if not chosen or not isinstance(chosen, dict) or not chosen.get("chunks"):
log.warning("[WebSword] _sieve | winner '%s' empty, forcing basic_dom", name)
chosen = basic
chosen.setdefault("issues", [])
chosen["issues"].append({"type": "sieve_forced_basic", "detail": "winner_empty"})
for i, m in enumerate(chosen.get("media") or [], 1):
m["media_id"] = m.get("media_id") or f"m{i}"
chosen.setdefault("issues", [])
chosen["issues"].append({"type": "sieve_choice",
"detail": {"name": name, "score": round(score, 3)}})
_log_exit("_sieve", f"chosen={name} chunks={len(chosen.get('chunks') or [])}")
return chosen
# ─────────────────────────────────────────
# Layer E: Manifest
# ─────────────────────────────────────────
def _build_manifest(self, fetch_meta: Dict, issues: List) -> Dict:
_log_entry("_build_manifest")
page = self._sieve(fetch_meta, issues)
site_id = _sha256(
f"{fetch_meta.get('final_url')}|{fetch_meta.get('fetched_at_unix')}|WebSword0.3"
)[:16]
manifest = {
"producer": "WebSword",
"schema_version": "0.3",
"site_id": site_id,
"base_url": fetch_meta.get("final_url", fetch_meta.get("url")),
"fetched_at_unix": fetch_meta.get("fetched_at_unix"),
"pages": [{
"page_id": "p1",
"url": fetch_meta.get("final_url", fetch_meta.get("url")),
"title": page.get("title", ""),
"lang": page.get("lang", ""),
"toc": page.get("toc", []),
"chunks": page.get("chunks", []),
"links": page.get("links", []),
"media": page.get("media", []),
"issues": page.get("issues", []),
"fetch": {
"mode": fetch_meta.get("mode"),
"status_code": fetch_meta.get("status_code"),
"content_type": fetch_meta.get("content_type", ""),
"intercepted_json_count": len(fetch_meta.get("intercepted_json") or []),
"has_accessibility_tree": bool(fetch_meta.get("accessibility_tree")),
"has_screenshot": bool(fetch_meta.get("screenshot_b64")),
},
}],
"capabilities": {
"modes_available": list(VALID_MODES),
"playwright_modes_enabled": [
m for m, v in [
("rendered", self.valves.ENABLE_RENDERED_FETCH),
("stealth", self.valves.ENABLE_STEALTH_SCRAPE),
("intercept", self.valves.ENABLE_INTERCEPT_FETCH),
("cdp", self.valves.ENABLE_CDP_FETCH),
] if v
],
},
}
_log_exit("_build_manifest", f"site_id={site_id}")
return manifest
# ─────────────────────────────────────────
# Public API
# ─────────────────────────────────────────
async def ws_ingest(
self,
url: str,
mode: str = "auto",
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Seite fetchen, extrahieren und als Manifest speichern.
mode: auto | static | stealth | intercept | cdp | rendered
auto - statisch; bei zu wenig Text -> stealth (wenn aktiv)
static - normaler HTTP-Fetch
stealth - Playwright + playwright-stealth (Bot-Schutz umgehen)
intercept - Playwright XHR/JSON-Sniffer (TikTok, Instagram, Reddit ...)
cdp - Playwright + Accessibility Tree (Sites ohne Hyperlinks)
rendered - Playwright ohne stealth (einfache SPAs)
Playwright-Modi benoetigen das jeweilige ENABLE_*=True Valve
sowie 'playwright install chromium' im Container.
"""
_log_entry("ws_ingest", url=url, mode=mode)
emitter = EventEmitter(__event_emitter__)
if not self.valves.ENABLE_INGEST:
return "❌ ws_ingest ist deaktiviert."
if not url or not isinstance(url, str):
return "❌ Bitte eine gueltige URL angeben."
try:
await emitter.status(f"🗡️ WebSword: mode={mode}{url}")
fetch_meta, issues = self._acquire(url, mode=mode)
_log_step("ws_ingest", "acquire done",
status=fetch_meta.get("status_code"),
html_len=len(fetch_meta.get("html") or ""),
mode=fetch_meta.get("mode"))
await emitter.status("🧪 Extrahiere Struktur …")
manifest = self._build_manifest(fetch_meta, issues)
site_id = manifest["site_id"]
_WEBSWORD_STORE[site_id] = manifest
page = manifest["pages"][0]
fetch_info = page["fetch"]
await emitter.status("✅ Ingest fertig.", done=True)
fetch_line = (
f"`{fetch_info['mode']}` | HTTP `{fetch_info['status_code']}`"
+ (f" | JSON-Responses: {fetch_info['intercepted_json_count']}"
if fetch_info["intercepted_json_count"] else "")
+ (" | 🌳 AX-Tree" if fetch_info["has_accessibility_tree"] else "")
+ (" | 📸 Screenshot" if fetch_info["has_screenshot"] else "")
)
lines = [
"# 🗡️ WebSword\n",
f"**Site ID:** `{site_id}`",
f"**URL:** {page['url']}",
f"**Title:** {page.get('title', '')}",
f"**Fetch:** {fetch_line}",
"",
f"**Chunks:** {len(page.get('chunks', []))} | "
f"**Media:** {len(page.get('media', []))} | "
f"**Links:** {len(page.get('links', []))}",
"",
"## Next actions",
f"- `ws_map(site_id='{site_id}')`",
f"- `ws_get(site_id='{site_id}', chunk_id='c1')`",
f"- `ws_search(site_id='{site_id}', query='...')`",
f"- `ws_list_media(site_id='{site_id}')`",
"",
"## Available modes",
" `auto` · `static` · `stealth` · `intercept` · `cdp` · `rendered`",
]
issues_short = (page.get("issues") or [])[:6]
if issues_short:
lines += ["", "## Issues (first 6)"]
for it in issues_short:
lines.append(f"- `{it.get('type')}`: {it.get('detail')}")
_log_exit("ws_ingest", f"site_id={site_id}")
return "\n".join(lines)
except Exception as e:
log.exception("[WebSword] ws_ingest | EXCEPTION: %s", e)
msg = f"{type(e).__name__}: {e}"
await emitter.error(msg)
return msg
async def ws_map(
self,
site_id: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Struktur eines gespeicherten Manifests anzeigen."""
_log_entry("ws_map", site_id=site_id)
emitter = EventEmitter(__event_emitter__)
if not self.valves.ENABLE_MAP:
return "❌ ws_map ist deaktiviert."
manifest = _WEBSWORD_STORE.get(site_id)
if not manifest:
return "❌ Unbekannte site_id — bitte zuerst ws_ingest aufrufen."
page = manifest["pages"][0]
chunks = page.get("chunks", [])
media = page.get("media", [])
lines = [
"# 🧭 WebSword Map\n",
f"**Site ID:** `{site_id}`",
f"**Title:** {page.get('title', '')}",
f"**Fetch mode:** `{page['fetch']['mode']}`",
"", "## Chunks",
]
for c in chunks[:30]:
lines.append(
f"- `{c.get('chunk_id')}` ({len(c.get('text') or '')} chars)"
f"{(c.get('heading') or '')[:120]}"
)
if len(chunks) > 30:
lines.append(f"- … ({len(chunks)-30} more)")
lines += ["", "## Media"]
if not media:
lines.append("- (none)")
else:
for m in media[:30]:
lines.append(f"- `{m.get('media_id')}` **{m.get('type')}** — {(m.get('src') or '')[:100]}")
if len(media) > 30:
lines.append(f"- … ({len(media)-30} more)")
await emitter.status("✅ Map bereit.", done=True)
_log_exit("ws_map", f"chunks={len(chunks)} media={len(media)}")
return "\n".join(lines)
async def ws_get(
self,
site_id: str,
chunk_id: str = "c1",
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Einen Chunk aus dem Manifest lesen."""
_log_entry("ws_get", site_id=site_id, chunk_id=chunk_id)
emitter = EventEmitter(__event_emitter__)
if not self.valves.ENABLE_GET:
return "❌ ws_get ist deaktiviert."
manifest = _WEBSWORD_STORE.get(site_id)
if not manifest:
return "❌ Unbekannte site_id."
page = manifest["pages"][0]
for c in page.get("chunks", []):
if c.get("chunk_id") == chunk_id:
await emitter.status("✅ Chunk geladen.", done=True)
_log_exit("ws_get", f"chunk_id={chunk_id} len={len(c.get('text',''))}")
return f"# 📄 {c.get('heading', '')}\n\n**Chunk ID:** `{chunk_id}`\n\n{c.get('text', '')}"
available = [c.get("chunk_id") for c in page.get("chunks", [])]
log.warning("[WebSword] ws_get | unknown chunk_id=%s available=%s", chunk_id, available)
return f"❌ Unbekannte chunk_id. Verfuegbar: {available}"
async def ws_search(
self,
site_id: str,
query: str,
limit: int = 8,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Volltextsuche in einem gespeicherten Manifest."""
_log_entry("ws_search", site_id=site_id, query=query)
emitter = EventEmitter(__event_emitter__)
if not self.valves.ENABLE_SEARCH:
return "❌ ws_search ist deaktiviert."
manifest = _WEBSWORD_STORE.get(site_id)
if not manifest:
return "❌ Unbekannte site_id."
q = (query or "").strip().lower()
if not q:
return "❌ Leere Suchanfrage."
page = manifest["pages"][0]
hits = []
for c in page.get("chunks", []):
t = c.get("text") or ""
idx = t.lower().find(q)
if idx >= 0:
snippet = t[max(0, idx - 120): idx + 240]
hits.append((c.get("chunk_id"), c.get("heading", ""), snippet))
if len(hits) >= int(limit):
break
lines = [f"# 🔍 WebSword Search: {query}\n", f"**Site ID:** `{site_id}`\n"]
if not hits:
lines.append("_Keine Treffer._")
else:
for cid, heading, snippet in hits:
lines.append(f"- `{cid}` — **{heading}**")
lines.append(f" > {snippet}")
await emitter.status("✅ Suche fertig.", done=True)
_log_exit("ws_search", f"hits={len(hits)}")
return "\n".join(lines)
async def ws_list_media(
self,
site_id: str,
media_type: Optional[str] = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Media-Elemente eines Manifests auflisten."""
_log_entry("ws_list_media", site_id=site_id, media_type=media_type)
emitter = EventEmitter(__event_emitter__)
if not self.valves.ENABLE_MEDIA:
return "❌ ws_list_media ist deaktiviert."
manifest = _WEBSWORD_STORE.get(site_id)
if not manifest:
return "❌ Unbekannte site_id."
mt = (media_type or "").strip().lower()
media = manifest["pages"][0].get("media", [])
out = [m for m in media if not mt or m.get("type", "").lower() == mt]
lines = [f"# 🖼️ WebSword Media\n", f"**Site ID:** `{site_id}`\n"]
if not out:
lines.append("_Keine Media gefunden._")
else:
for m in out[:60]:
lines.append(f"- `{m.get('media_id')}` **{m.get('type')}** — {(m.get('src') or '')[:100]}")
await emitter.status("✅ Media-Liste bereit.", done=True)
_log_exit("ws_list_media", f"count={len(out)}")
return "\n".join(lines)
async def ws_embed(
self,
site_id: str,
media_id: str,
mode: str = "auto",
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Media-Element einbetten: Markdown img / iframe / Link."""
_log_entry("ws_embed", site_id=site_id, media_id=media_id, mode=mode)
emitter = EventEmitter(__event_emitter__)
if not self.valves.ENABLE_MEDIA:
return "❌ ws_embed ist deaktiviert."
manifest = _WEBSWORD_STORE.get(site_id)
if not manifest:
return "❌ Unbekannte site_id."
target = next(
(m for m in manifest["pages"][0].get("media", []) if m.get("media_id") == media_id),
None,
)
if not target:
return "❌ Unbekannte media_id."
mtype = (target.get("type") or "").lower()
src = target.get("src") or ""
provider = (target.get("provider") or _safe_domain(src)).lower()
mode = (mode or "auto").lower().strip()
allow = set(d.lower() for d in self.valves.EMBED_ALLOWLIST)
if mode == "auto":
if mtype == "image":
mode = "img"
elif mtype == "embed" and provider in allow:
mode = "iframe"
else:
mode = "link"
if mode == "iframe":
if provider not in allow:
await emitter.status("⚠️ Provider nicht erlaubt → Link.", done=True)
title = target.get("title") or target.get("alt") or _safe_domain(src) or src
return f"[{title}]({src})"
snippet = (
f'<iframe src="{src}" loading="lazy" '
f'sandbox="allow-scripts allow-same-origin allow-presentation" '
f'referrerpolicy="no-referrer" '
f'style="width:100%;height:420px;border:0;border-radius:12px;"></iframe>'
)
await emitter.status("✅ Iframe bereit.", done=True)
_log_exit("ws_embed", "iframe")
return snippet
if mode == "img":
if mtype != "image":
return f"❌ Kein Bild (type={mtype})."
alt = target.get("alt") or target.get("title") or src
await emitter.status("✅ Bild bereit.", done=True)
_log_exit("ws_embed", "img")
return f"![{alt}]({src})"
# link
title = target.get("title") or target.get("alt") or _safe_domain(src) or src
await emitter.status("✅ Link bereit.", done=True)
_log_exit("ws_embed", "link")
return f"[{title}]({src})"
async def excalibur(
self,
url: str,
mode: str = "auto",
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Deep-crawl einer ganzen Site (BFS) mit komprimierter Rueckgabe.
Scannt Startseite + alle Unterseiten der gleichen Domain bis zur
konfigurierten Tiefe. Derselbe mode-Parameter wie ws_ingest wird
auf jede gecrawlte Seite angewendet.
mode: auto | static | stealth | intercept | cdp
Fuer normale Sites: auto oder static.
Fuer Bot-geschuetzte Sites: stealth.
Fuer XHR-basierte Sites (TikTok etc.): intercept.
Fuer Sites ohne Hyperlinks: cdp.
Valves:
ENABLE_EXCALIBUR (bool, default=False) — explizit aktivieren
EXCALIBUR_MAX_DEPTH (int, default=2)
EXCALIBUR_MAX_PAGES (int, default=20)
EXCALIBUR_DELAY_S (float, default=0.5)
EXCALIBUR_CHUNK_PREVIEW_CHARS (int, default=500)
EXCALIBUR_TOKEN_WARN_THRESHOLD(int, default=50000)
Fuer Einzelseiten: ws_ingest() verwenden.
"""
_log_entry("excalibur", url=url, mode=mode)
emitter = EventEmitter(__event_emitter__)
if not self.valves.ENABLE_EXCALIBUR:
log.warning("[WebSword] excalibur | ENABLE_EXCALIBUR=False")
await emitter.error("Excalibur ist deaktiviert.")
return (
"⚔️ **Excalibur ist deaktiviert.**\n\n"
"Fuer gezielte Seiten bitte ws_ingest() nutzen:\n"
"- `ws_ingest(url)` — Einzelseite (auto)\n"
"- `ws_ingest(url, mode='stealth')` — Bot-geschuetzte Seite\n"
"- `ws_ingest(url, mode='intercept')` — XHR-basierte Seite\n"
"- `ws_ingest(url, mode='cdp')` — Site ohne Hyperlinks\n\n"
"Aktivieren: Admin-Valves → `ENABLE_EXCALIBUR=True`"
)
if not url or not isinstance(url, str):
return "❌ Bitte eine gueltige URL angeben."
base_domain = _safe_domain(url)
if not base_domain:
return "❌ Domain konnte nicht ermittelt werden."
def _is_same_domain(u: str) -> bool:
return _safe_domain(u) == base_domain
try:
queue = [(_normalize_url(url), 0)]
visited = {_normalize_url(url)}
pages_data = []
await emitter.status(
f"⚔️ Excalibur: {url} | mode={mode} | "
f"depth={self.valves.EXCALIBUR_MAX_DEPTH} "
f"max_pages={self.valves.EXCALIBUR_MAX_PAGES}"
)
while queue and len(pages_data) < self.valves.EXCALIBUR_MAX_PAGES:
current_url, depth = queue.pop(0)
_log_step("excalibur", "crawling", url=current_url, depth=depth, done=len(pages_data))
await emitter.status(
f"⚔️ Seite {len(pages_data)+1} | Tiefe {depth} | {current_url}"
)
fetch_meta, issues = self._acquire(current_url, mode=mode)
# Links fuer BFS sammeln
if depth < self.valves.EXCALIBUR_MAX_DEPTH:
for lnk in self._extract_links_basic(fetch_meta.get("html") or ""):
href = lnk.get("href") or ""
resolved = _normalize_url(_resolve_url(href, current_url))
if (
resolved not in visited
and _is_same_domain(resolved)
and resolved.startswith("http")
and len(visited) < self.valves.EXCALIBUR_MAX_PAGES * 3
):
visited.add(resolved)
queue.append((resolved, depth + 1))
page = self._sieve(fetch_meta, issues)
pages_data.append({
"url": current_url,
"depth": depth,
"title": page.get("title") or current_url,
"toc": page.get("toc") or [],
"chunks": page.get("chunks") or [],
"media": page.get("media") or [],
"fetch_mode": fetch_meta.get("mode") or "static",
"status_code": fetch_meta.get("status_code") or 0,
})
if queue:
time.sleep(self.valves.EXCALIBUR_DELAY_S)
await emitter.status("⚔️ Excalibur: komprimiere Ergebnis …")
_log_step("excalibur", "crawl done", pages=len(pages_data))
preview_chars = int(self.valves.EXCALIBUR_CHUNK_PREVIEW_CHARS)
lines = [
"# ⚔️ Excalibur Site Map\n",
f"**Base URL:** {url}",
f"**Domain:** {base_domain}",
f"**Mode:** `{mode}`",
f"**Gecrawlt:** {len(pages_data)} Seiten | max Tiefe {self.valves.EXCALIBUR_MAX_DEPTH}",
"",
]
total_chars = 0
for i, pd in enumerate(pages_data, 1):
lines.append(f"---\n## Seite {i}: {pd['title']}")
lines.append(
f"**URL:** {pd['url']} | **Tiefe:** {pd['depth']} | "
f"**HTTP:** {pd['status_code']} | **mode:** `{pd['fetch_mode']}`"
)
if pd["toc"]:
toc_str = " · ".join(t.get("text", "") for t in pd["toc"][:8])
lines.append(f"**TOC:** {toc_str}")
if pd["chunks"]:
lines.append("**Inhalt (Vorschau):**")
for c in pd["chunks"]:
text = (c.get("text") or "")[:preview_chars]
if text:
lines.append(f"> {text.replace(chr(10), ' ')}")
total_chars += len(text)
if pd["media"]:
media_str = " · ".join(
f"{m.get('type','?')}:{(m.get('src') or '')[:60]}"
for m in pd["media"][:6]
)
lines.append(f"**Media:** {media_str}")
if len(pd["media"]) > 6:
lines.append(f" … +{len(pd['media'])-6} weitere")
lines.append("")
estimated_tokens = total_chars // 4
lines += [
"---",
f"**Geschaetzte Token (Inhalt):** ~{estimated_tokens:,}",
]
if estimated_tokens > self.valves.EXCALIBUR_TOKEN_WARN_THRESHOLD:
lines.append(
f"⚠️ Ueberschreitet Schwellwert "
f"({self.valves.EXCALIBUR_TOKEN_WARN_THRESHOLD:,} Token). "
"Erwaege ws_ingest + ws_get fuer gezieltere Abfragen."
)
await emitter.status("✅ Excalibur fertig.", done=True)
_log_exit("excalibur", f"pages={len(pages_data)} ~tokens={estimated_tokens}")
return "\n".join(lines)
except Exception as e:
log.exception("[WebSword] excalibur | EXCEPTION: %s", e)
msg = f"{type(e).__name__}: {e}"
await emitter.error(msg)
return msg
async def ws_dump(
self,
site_id: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Vollstaendiges Manifest als JSON (Debug). Nicht direkt ans LLM fuettern."""
_log_entry("ws_dump", site_id=site_id)
emitter = EventEmitter(__event_emitter__)
manifest = _WEBSWORD_STORE.get(site_id)
if not manifest:
return "❌ Unbekannte site_id."
import json
await emitter.status("✅ Dump bereit.", done=True)
_log_exit("ws_dump")
return "```json\n" + json.dumps(manifest, ensure_ascii=False, indent=2) + "\n```"