From ec74ddb7a97523e320cccd91637067df88fbfa62 Mon Sep 17 00:00:00 2001 From: H5N3RG Date: Fri, 6 Mar 2026 10:47:35 +0000 Subject: [PATCH] Dateien nach "/" hochladen --- websword_0.3-dev.py | 1456 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1456 insertions(+) create mode 100644 websword_0.3-dev.py diff --git a/websword_0.3-dev.py b/websword_0.3-dev.py new file mode 100644 index 0000000..d502bb5 --- /dev/null +++ b/websword_0.3-dev.py @@ -0,0 +1,1456 @@ +""" +title: WebSword +description: Universal website ingestion & navigation manifest for LLMs (OWUI internal). +author: Jan + ChatGPT + Claude +version: 0.3-dev +license: MIT +requirements: requests, pydantic, trafilatura, lxml, playwright, playwright-stealth + +Changelog: + 0.1.0 - initial release + 0.1.1 - comprehensive debug logging, NoneType guards + 0.2-dev - Markdown media rendering, Excalibur BFS-crawler, + stealth + intercept fetch strategies (stubs) + 0.3-dev - MAJOR REFACTOR: unified mode-based API + ws_stealth_scrape / ws_intercept_scrape entfernt + ws_ingest(mode=) ist zentraler Einstiegspunkt: + mode: auto | static | rendered | stealth | intercept | cdp + excalibur(mode=) nutzt denselben mode-Parameter + _acquire() ist zentraler Dispatcher fuer alle Fetch-Strategien + CDP/Accessibility-Tree als Stub vorbereitet (mode='cdp') + +Fetch-Strategien (alle ueber mode=): + static - normaler HTTP-Fetch, trafilatura-Extraktion + rendered - Playwright ohne stealth (Stub) + stealth - Playwright + playwright-stealth (Stub) + intercept - Playwright + XHR/JSON-Sniffer (Stub) + cdp - Playwright + Chrome DevTools Protocol / Accessibility Tree (Stub) + auto - statisch; bei zu wenig Text -> stealth als Fallback (wenn aktiv) + +Public API: + ws_ingest(url, mode='auto') - Seite fetchen & speichern + ws_map(site_id) - Struktur anzeigen + ws_get(site_id, chunk_id) - Chunk lesen + ws_search(site_id, query) - Volltextsuche + ws_list_media(site_id) - Media auflisten + ws_embed(site_id, media_id) - Media einbetten + excalibur(url, mode='auto') - Deep-crawl einer ganzen Site + ws_dump(site_id) - Debug: vollstaendiges Manifest +""" + +import re +import time +import html +import hashlib +import logging +from typing import Any, Callable, Dict, List, Optional, Tuple + +import requests +from pydantic import BaseModel, Field + +# ───────────────────────────────────────────── +# Logging +# ───────────────────────────────────────────── +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger("WebSword") + +VALID_MODES = ("auto", "static", "rendered", "stealth", "intercept", "cdp") + + +def _log_entry(fn: str, **kwargs): + log.debug("[WebSword] ▶ %s | %s", fn, kwargs) + + +def _log_exit(fn: str, summary: str = ""): + log.debug("[WebSword] ◀ %s | %s", fn, summary) + + +def _log_step(fn: str, step: str, **kwargs): + log.debug("[WebSword] ⟳ %s | %s | %s", fn, step, kwargs) + + +# ───────────────────────────────────────────── +# EventEmitter +# ───────────────────────────────────────────── +class EventEmitter: + def __init__(self, event_emitter: Callable[[dict], Any] = None): + self.event_emitter = event_emitter + + async def status(self, description: str, done: bool = False): + if self.event_emitter: + await self.event_emitter( + {"type": "status", "data": {"description": description, "done": done}} + ) + + async def error(self, description: str): + if self.event_emitter: + await self.event_emitter( + {"type": "status", "data": {"description": f"❌ {description}", "done": True}} + ) + + +# ───────────────────────────────────────────── +# Module-level helpers +# ───────────────────────────────────────────── +def _now_unix() -> int: + return int(time.time()) + + +def _sha256(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest() + + +def _truncate_bytes(s: str, max_bytes: int) -> str: + b = s.encode("utf-8", errors="ignore") + return s if len(b) <= max_bytes else b[:max_bytes].decode("utf-8", errors="ignore") + + +def _basic_visible_text_estimate(html_text: str) -> int: + t = re.sub(r"(?is)<(script|style|noscript)\b.*?>.*?", " ", html_text) + t = re.sub(r"(?s)<[^>]+>", " ", t) + t = html.unescape(t) + return len(re.sub(r"\s+", " ", t).strip()) + + +def _looks_like_needs_render(html_text: str, min_len: int) -> bool: + text_len = _basic_visible_text_estimate(html_text) + needs = text_len < min_len + log.debug("[WebSword] _looks_like_needs_render | visible=%d min=%d -> %s", text_len, min_len, needs) + return needs + + +def _extract_title_basic(html_text: str) -> str: + m = re.search(r"(?is)]*>(.*?)", html_text) + if not m: + return "" + return re.sub(r"\s+", " ", html.unescape(m.group(1))).strip()[:200] + + +def _safe_domain(url: str) -> str: + m = re.match(r"^https?://([^/]+)/?", (url or "").strip()) + return m.group(1).lower() if m else "" + + +def _normalize_url(u: str) -> str: + return u.split("#")[0].rstrip("/") + + +def _resolve_url(href: str, current: str) -> str: + if href.startswith("http://") or href.startswith("https://"): + return href + if href.startswith("//"): + scheme = current.split("://")[0] if "://" in current else "https" + return f"{scheme}:{href}" + if href.startswith("/"): + m = re.match(r"^(https?://[^/]+)", current) + return f"{m.group(1)}{href}" if m else href + base = current.rsplit("/", 1)[0] + return f"{base}/{href}" + + +# ───────────────────────────────────────────── +# In-memory store +# ───────────────────────────────────────────── +_WEBSWORD_STORE: Dict[str, Dict[str, Any]] = {} + + +# ───────────────────────────────────────────── +# Empty fetch_meta factory +# ───────────────────────────────────────────── +def _empty_fetch_meta(url: str, mode: str) -> Dict[str, Any]: + return { + "mode": mode, + "url": url, + "final_url": url, + "status_code": 0, + "content_type": "", + "headers": {}, + "html": "", + "intercepted_json": [], + "accessibility_tree": None, + "screenshot_b64": None, + "fetched_at_unix": _now_unix(), + } + + +# ───────────────────────────────────────────── +# Tools class +# ───────────────────────────────────────────── +class Tools: + + class Valves(BaseModel): + + # HTTP + USER_AGENT: str = Field( + default="WebSword/0.3 (+open-webui)", + description="User-Agent fuer HTTP-Requests.", + ) + TIMEOUT_S: int = Field(default=15, description="HTTP-Timeout in Sekunden.") + MAX_HTML_BYTES: int = Field( + default=2_000_000, description="Max. Bytes vom gefetchten HTML." + ) + + # Feature flags + ENABLE_INGEST: bool = Field(default=True, description="ws_ingest erlauben.") + ENABLE_MAP: bool = Field(default=True, description="ws_map erlauben.") + ENABLE_GET: bool = Field(default=True, description="ws_get erlauben.") + ENABLE_SEARCH: bool = Field(default=True, description="ws_search erlauben.") + ENABLE_MEDIA: bool = Field(default=True, description="ws_list_media / ws_embed erlauben.") + ENABLE_EXCALIBUR: bool = Field( + default=False, + description="Excalibur Deep-crawl. DEAKTIVIERT by default — kann viele Requests ausloesen.", + ) + + # Playwright-Modi (alle default=False) + ENABLE_RENDERED_FETCH: bool = Field( + default=False, + description="mode='rendered': Playwright ohne stealth. Benoetigt: playwright install chromium.", + ) + ENABLE_STEALTH_SCRAPE: bool = Field( + default=False, + description="mode='stealth': Playwright + playwright-stealth. Benoetigt: playwright install chromium.", + ) + ENABLE_INTERCEPT_FETCH: bool = Field( + default=False, + description=( + "mode='intercept': Playwright XHR/JSON-Sniffer. " + "Plattformunabhaengig fuer SPAs (TikTok, Instagram, Reddit ...). " + "Benoetigt: playwright install chromium." + ), + ) + ENABLE_CDP_FETCH: bool = Field( + default=False, + description=( + "mode='cdp': Playwright + Chrome DevTools Protocol (Accessibility Tree). " + "Macht Sites ohne Hyperlinks navigierbar. " + "Benoetigt: playwright install chromium." + ), + ) + ENABLE_STEALTH_SCREENSHOT: bool = Field( + default=False, + description="Bei stealth/cdp/intercept: Screenshot als base64-PNG anhaengen.", + ) + + # Playwright-Timeouts (gemeinsam fuer alle Modi) + PLAYWRIGHT_TIMEOUT_MS: int = Field( + default=20000, + description="Playwright page-load Timeout in ms (gilt fuer alle Playwright-Modi).", + ) + PLAYWRIGHT_WAIT_UNTIL: str = Field( + default="networkidle", + description="Playwright wait_until: networkidle | domcontentloaded | load | commit.", + ) + + # Intercept-spezifisch + INTERCEPT_MAX_RESPONSES: int = Field( + default=20, description="Max. JSON-Responses die der Interceptor sammelt.", + ) + INTERCEPT_MIN_JSON_BYTES: int = Field( + default=64, description="Min. Bytes einer JSON-Response (filtert triviale).", + ) + + # Auto-mode + AUTO_RENDER_MIN_TEXT_LEN: int = Field( + default=500, + description="auto-mode: Wenn sichtbarer Text kuerzer, wird stealth versucht (falls aktiv).", + ) + + # Extraktion + TRAFILATURA_FAVOR: bool = Field( + default=True, description="trafilatura bevorzugen wenn Qualitaet ausreichend.", + ) + TRAFI_MIN_TEXT_LEN: int = Field( + default=800, description="Min. Textlaenge damit trafilatura als erfolgreich gilt.", + ) + MAX_CHUNK_CHARS: int = Field(default=8000, description="Max. Zeichen pro Chunk.") + CHUNK_TARGET_CHARS: int = Field(default=2200, description="Ziel-Chunk-Groesse.") + CHUNK_OVERLAP_CHARS: int = Field(default=120, description="Ueberlappung zwischen Chunks.") + MAX_CHUNKS: int = Field(default=12, description="Max. Chunks pro Seite.") + MAX_MEDIA_ITEMS: int = Field(default=40, description="Max. Media-Items.") + MAX_LINK_ITEMS: int = Field(default=60, description="Max. Links.") + + # Embed + EMBED_ALLOWLIST: List[str] = Field( + default_factory=lambda: [ + "youtube.com", "www.youtube.com", "youtu.be", + "player.vimeo.com", "vimeo.com", + "soundcloud.com", "w.soundcloud.com", + "open.spotify.com", + ], + description="Erlaubte iframe-Domains.", + ) + + # Excalibur + EXCALIBUR_MAX_DEPTH: int = Field(default=2, description="Excalibur: Max. Crawl-Tiefe.") + EXCALIBUR_MAX_PAGES: int = Field(default=20, description="Excalibur: Max. Seiten.") + EXCALIBUR_DELAY_S: float = Field(default=0.5, description="Excalibur: Pause zwischen Requests in Sekunden.") + EXCALIBUR_CHUNK_PREVIEW_CHARS: int = Field( + default=500, description="Excalibur: Max. Zeichen pro Chunk in der komprimierten Ausgabe.", + ) + EXCALIBUR_TOKEN_WARN_THRESHOLD: int = Field( + default=50000, description="Excalibur: Warnung wenn geschaetzte Token diesen Wert uebersteigen.", + ) + + def __init__(self): + self.valves = self.Valves() + + # ───────────────────────────────────────── + # Layer A: Fetch-Strategien (intern) + # ───────────────────────────────────────── + + def _fetch_static(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + _log_entry("_fetch_static", url=url) + issues = [] + headers = { + "User-Agent": self.valves.USER_AGENT, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + } + try: + r = requests.get(url, headers=headers, timeout=self.valves.TIMEOUT_S, allow_redirects=True) + ct = r.headers.get("Content-Type", "") + raw = r.text if r.encoding else r.content.decode("utf-8", errors="ignore") + raw = _truncate_bytes(raw, self.valves.MAX_HTML_BYTES) + _log_step("_fetch_static", "response", status=r.status_code, ct=ct, html_len=len(raw)) + if "text/html" not in ct and "application/xhtml" not in ct: + issues.append({"type": "non_html_content_type", "detail": ct}) + meta = _empty_fetch_meta(url, "static") + meta.update({ + "final_url": str(r.url), + "status_code": int(r.status_code), + "content_type": ct, + "headers": dict(r.headers), + "html": raw, + }) + _log_exit("_fetch_static", f"status={r.status_code} len={len(raw)}") + return meta, issues + except Exception as e: + log.exception("[WebSword] _fetch_static | EXCEPTION: %s", e) + issues.append({"type": "fetch_static_failed", "detail": repr(e)}) + return _empty_fetch_meta(url, "static"), issues + + def _fetch_rendered(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + """ + Playwright ohne stealth — Stub v0.3. + + TODO(v0.3): + async with async_playwright() as pw: + browser = await pw.chromium.launch(headless=True) + page = await browser.new_page() + resp = await page.goto(url, + wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL, + timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS) + raw_html = await page.content() + await browser.close() + """ + _log_entry("_fetch_rendered", url=url) + issues = [] + if not self.valves.ENABLE_RENDERED_FETCH: + issues.append({"type": "rendered_disabled", "detail": "ENABLE_RENDERED_FETCH=False"}) + return _empty_fetch_meta(url, "rendered"), issues + issues.append({"type": "rendered_not_implemented", "detail": "stub v0.3"}) + return _empty_fetch_meta(url, "rendered"), issues + + def _fetch_stealth(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + """ + Playwright + playwright-stealth — Stub v0.3. + + Umgeht Standard-Headless-Detection, Canvas-Fingerprinting etc. + Grenzen: Cloudflare Turnstile, hCaptcha, harte Login-Walls. + + TODO(v0.3): + from playwright.async_api import async_playwright + from playwright_stealth import stealth_async + async with async_playwright() as pw: + browser = await pw.chromium.launch(headless=True) + ctx = await browser.new_context( + user_agent=self.valves.USER_AGENT, + viewport={"width": 1280, "height": 800}, + locale="de-DE", + ) + page = await ctx.new_page() + await stealth_async(page) + resp = await page.goto(url, + wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL, + timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS) + raw_html = await page.content() + screenshot_b64 = None + if self.valves.ENABLE_STEALTH_SCREENSHOT: + import base64 + screenshot_b64 = base64.b64encode( + await page.screenshot(full_page=True)).decode() + await browser.close() + meta = _empty_fetch_meta(url, "stealth") + meta.update({"html": raw_html, "status_code": resp.status, + "screenshot_b64": screenshot_b64, "final_url": page.url}) + """ + _log_entry("_fetch_stealth", url=url) + issues = [] + if not self.valves.ENABLE_STEALTH_SCRAPE: + issues.append({"type": "stealth_disabled", "detail": "ENABLE_STEALTH_SCRAPE=False"}) + return _empty_fetch_meta(url, "stealth"), issues + issues.append({"type": "stealth_not_implemented", "detail": "stub v0.3"}) + return _empty_fetch_meta(url, "stealth"), issues + + async def _fetch_intercept(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + """ + Playwright + XHR/JSON-Sniffer — Stub v0.3. + + Lauscht passiv auf alle JSON-API-Responses waehrend die Seite laedt. + Plattformunabhaengig: TikTok, Instagram, Twitter/X, Reddit, Amazon, ... + + TODO(v0.3): + intercepted = [] + async def handle_response(response): + ct = response.headers.get("content-type", "") + if "application/json" not in ct: + return + try: + body = await response.body() + if len(body) >= self.valves.INTERCEPT_MIN_JSON_BYTES: + intercepted.append({ + "url": response.url, + "status": response.status, + "body": body.decode("utf-8", errors="ignore"), + }) + except Exception as e: + log.debug("[WebSword] intercept response error: %s", e) + + async with async_playwright() as pw: + browser = await pw.chromium.launch(headless=True) + page = await browser.new_page() + await stealth_async(page) + page.on("response", handle_response) + resp = await page.goto(url, + wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL, + timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS) + raw_html = await page.content() + await browser.close() + meta = _empty_fetch_meta(url, "intercept") + meta.update({"html": raw_html, "status_code": resp.status if resp else 0, + "intercepted_json": intercepted[:self.valves.INTERCEPT_MAX_RESPONSES], + "final_url": page.url}) + """ + _log_entry("_fetch_intercept", url=url) + issues = [] + if not self.valves.ENABLE_INTERCEPT_FETCH: + issues.append({"type": "intercept_disabled", "detail": "ENABLE_INTERCEPT_FETCH=False"}) + return _empty_fetch_meta(url, "intercept"), issues + issues.append({"type": "intercept_not_implemented", "detail": "stub v0.3"}) + return _empty_fetch_meta(url, "intercept"), issues + + def _fetch_cdp(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + """ + Playwright + Chrome DevTools Protocol — Stub v0.3. + + Macht Sites ohne Hyperlinks navigierbar: + - Accessibility Tree: semantische Struktur unabhaengig vom visuellen DOM + - Alle interaktiven Elemente (Buttons, ARIA-Roles, onClick-Handler) + - Shadow DOM durchdringen + - Kein klassisches Hyperlink-System noetig + + TODO(v0.3): + async with async_playwright() as pw: + browser = await pw.chromium.launch(headless=True) + page = await browser.new_page() + await stealth_async(page) + client = await page.context.new_cdp_session(page) + resp = await page.goto(url, + wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL, + timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS) + raw_html = await page.content() + + # Accessibility Tree + ax_tree = await client.send("Accessibility.getFullAXTree") + + # Alle interaktiven Elemente inventarisieren + clickables = await page.query_selector_all( + "button, [role='button'], [role='link'], [role='menuitem'], " + "[tabindex], [onclick], a, input[type='submit']" + ) + interactive = [] + for el in clickables: + try: + label = await el.get_attribute("aria-label") or await el.inner_text() + box = await el.bounding_box() + interactive.append({ + "label": (label or "").strip()[:80], + "visible": box is not None, + "tag": await el.evaluate("e => e.tagName"), + }) + except Exception: + pass + + screenshot_b64 = None + if self.valves.ENABLE_STEALTH_SCREENSHOT: + import base64 + screenshot_b64 = base64.b64encode( + await page.screenshot(full_page=True)).decode() + await browser.close() + + meta = _empty_fetch_meta(url, "cdp") + meta.update({ + "html": raw_html, + "status_code": resp.status if resp else 0, + "final_url": page.url, + "accessibility_tree": ax_tree, + "interactive_elements": interactive, + "screenshot_b64": screenshot_b64, + }) + """ + _log_entry("_fetch_cdp", url=url) + issues = [] + if not self.valves.ENABLE_CDP_FETCH: + issues.append({"type": "cdp_disabled", "detail": "ENABLE_CDP_FETCH=False"}) + return _empty_fetch_meta(url, "cdp"), issues + issues.append({"type": "cdp_not_implemented", "detail": "stub v0.3"}) + return _empty_fetch_meta(url, "cdp"), issues + + # ───────────────────────────────────────── + # Layer B: _acquire() — zentraler Dispatcher + # ───────────────────────────────────────── + + def _acquire(self, url: str, mode: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + """ + Zentraler Fetch-Dispatcher fuer alle Strategien. + + mode: auto | static | rendered | stealth | intercept | cdp + + Fallback-Ketten: + auto -> static; bei zu wenig Text -> stealth (wenn aktiv) -> static + rendered -> static (wenn rendered leer) + stealth -> static (wenn stealth leer) + intercept -> stealth (wenn aktiv) -> static + cdp -> stealth (wenn aktiv) -> static + """ + _log_entry("_acquire", url=url, mode=mode) + mode = (mode or "auto").lower().strip() + if mode not in VALID_MODES: + log.warning("[WebSword] _acquire | unknown mode '%s', using auto", mode) + mode = "auto" + + def _fallback_static(reason: str, prior_issues: list) -> Tuple[Dict, List]: + log.warning("[WebSword] _acquire | fallback to static: %s", reason) + meta, issues = self._fetch_static(url) + issues = prior_issues + issues + issues.append({"type": "acquire_fallback_static", "detail": reason}) + return meta, issues + + def _fallback_stealth_then_static(reason: str, prior_issues: list) -> Tuple[Dict, List]: + if self.valves.ENABLE_STEALTH_SCRAPE: + log.warning("[WebSword] _acquire | fallback to stealth: %s", reason) + meta, issues = self._fetch_stealth(url) + issues = prior_issues + issues + issues.append({"type": "acquire_fallback_stealth", "detail": reason}) + if (meta.get("html") or "").strip(): + return meta, issues + return _fallback_static(reason + "_then_static", prior_issues) + + # static + if mode == "static": + meta, issues = self._fetch_static(url) + issues.append({"type": "acquire_mode", "detail": "static"}) + _log_exit("_acquire", "static") + return meta, issues + + # rendered + if mode == "rendered": + meta, issues = self._fetch_rendered(url) + if not (meta.get("html") or "").strip(): + return _fallback_static("rendered_empty", issues) + issues.append({"type": "acquire_mode", "detail": "rendered"}) + _log_exit("_acquire", "rendered") + return meta, issues + + # stealth + if mode == "stealth": + meta, issues = self._fetch_stealth(url) + if not (meta.get("html") or "").strip(): + return _fallback_static("stealth_empty", issues) + issues.append({"type": "acquire_mode", "detail": "stealth"}) + _log_exit("_acquire", "stealth") + return meta, issues + + # intercept + if mode == "intercept": + import asyncio + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as pool: + future = pool.submit(asyncio.run, self._fetch_intercept(url)) + meta, issues = future.result( + timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS / 1000 + 5) + else: + meta, issues = loop.run_until_complete(self._fetch_intercept(url)) + except Exception as e: + log.error("[WebSword] _acquire | intercept call failed: %s", e) + meta = _empty_fetch_meta(url, "intercept") + issues = [{"type": "intercept_call_failed", "detail": repr(e)}] + if not (meta.get("html") or "").strip() and not meta.get("intercepted_json"): + return _fallback_stealth_then_static("intercept_empty", issues) + issues.append({"type": "acquire_mode", "detail": "intercept"}) + _log_exit("_acquire", "intercept") + return meta, issues + + # cdp + if mode == "cdp": + meta, issues = self._fetch_cdp(url) + if not (meta.get("html") or "").strip(): + return _fallback_stealth_then_static("cdp_empty", issues) + issues.append({"type": "acquire_mode", "detail": "cdp"}) + _log_exit("_acquire", "cdp") + return meta, issues + + # auto + meta, issues = self._fetch_static(url) + _log_step("_acquire", "auto static done", + status=meta.get("status_code"), html_len=len(meta.get("html") or "")) + + if meta.get("status_code") in (401, 403): + issues.append({"type": "access_restricted", "detail": f"status={meta['status_code']}"}) + + html_text = meta.get("html") or "" + if _looks_like_needs_render(html_text, self.valves.AUTO_RENDER_MIN_TEXT_LEN): + if self.valves.ENABLE_STEALTH_SCRAPE: + log.debug("[WebSword] _acquire | auto: static insufficient, trying stealth") + s_meta, s_issues = self._fetch_stealth(url) + if (s_meta.get("html") or "").strip(): + s_issues.extend(issues) + s_issues.append({"type": "auto_used_stealth", "detail": "static_insufficient"}) + _log_exit("_acquire", "auto->stealth") + return s_meta, s_issues + issues.append({"type": "auto_used_static", "detail": "stealth_inactive_or_failed"}) + else: + issues.append({"type": "auto_used_static", "detail": "sufficient_text"}) + + _log_exit("_acquire", "auto->static") + return meta, issues + + # ───────────────────────────────────────── + # Layer C: Extraktion + # ───────────────────────────────────────── + + def _extract_links_basic(self, html_text: str) -> List[Dict[str, str]]: + out: List[Dict[str, str]] = [] + for m in re.finditer(r'(?is)]*href=["\']([^"\']+)["\']', html_text or ""): + href = (m.group(1) or "").strip() + if not href or href.startswith("#") or href.lower().startswith("javascript:"): + continue + out.append({"href": href}) + if len(out) >= self.valves.MAX_LINK_ITEMS: + break + return out + + def _extract_media_basic(self, html_text: str) -> List[Dict[str, Any]]: + media: List[Dict[str, Any]] = [] + for m in re.finditer( + r'(?is)]*src=["\']([^"\']+)["\']([^>]*)>', html_text or "" + ): + src = (m.group(1) or "").strip() + rest = m.group(2) or "" + alt_m = re.search(r'(?is)\balt=["\']([^"\']*)["\']', rest) + alt = (alt_m.group(1) if alt_m else "").strip() + media.append({"media_id": None, "type": "image", "src": src, "alt": alt}) + if len(media) >= self.valves.MAX_MEDIA_ITEMS: + break + if len(media) < self.valves.MAX_MEDIA_ITEMS: + for m in re.finditer( + r'(?is)]*src=["\']([^"\']+)["\']', html_text or "" + ): + src = (m.group(1) or "").strip() + media.append({ + "media_id": None, "type": "embed", + "src": src, "provider": _safe_domain(src), + }) + if len(media) >= self.valves.MAX_MEDIA_ITEMS: + break + return media + + def _make_chunk_id(self, prefix: str, text: str, idx: int) -> str: + h = _sha256(f"{prefix}|{idx}|{text[:120]}")[:10] + return f"c{idx}_{h}" + + def _chunk_text(self, text: str) -> List[str]: + _log_entry("_chunk_text", text_len=len(text or "")) + t = (text or "").strip() + if not t: + return [] + paras = [p.strip() for p in re.split(r"\n{2,}", t) if p.strip()] + if not paras: + return [] + + chunks: List[str] = [] + buf: List[str] = [] + buf_len = 0 + target = int(self.valves.CHUNK_TARGET_CHARS) + overlap = int(self.valves.CHUNK_OVERLAP_CHARS) + + def flush(): + nonlocal buf, buf_len + if not buf: + return + chunk = "\n\n".join(buf).strip() + if chunk: + chunks.append(chunk) + buf, buf_len = [], 0 + + for p in paras: + plen = len(p) + if buf_len + plen + 2 <= target or not buf: + buf.append(p) + buf_len += plen + 2 + else: + flush() + buf.append(p) + buf_len = plen + 2 + if len(chunks) >= int(self.valves.MAX_CHUNKS): + break + + flush() + + if overlap > 0 and len(chunks) > 1: + out = [chunks[0]] + for i in range(1, len(chunks)): + tail = out[-1][-overlap:] + out.append((tail + "\n\n" + chunks[i]).strip()) + chunks = out + + result = chunks[:int(self.valves.MAX_CHUNKS)] + _log_exit("_chunk_text", f"chunks={len(result)}") + return result + + def _extract_headings_best_effort(self, html_text: str) -> List[str]: + if not html_text: + return [] + m = re.search(r"(?is)<(main|article)\b[^>]*>(.*?)", html_text) + scope = m.group(2) if m else html_text + headings: List[str] = [] + for m in re.finditer(r"(?is)]*>(.*?)", scope): + raw = re.sub(r"(?s)<[^>]+>", " ", m.group(2)) + txt = re.sub(r"\s+", " ", html.unescape(raw)).strip() + if txt and len(txt) >= 3: + headings.append(txt[:160]) + if len(headings) >= 24: + break + seen = set() + out = [] + for h in headings: + key = h.lower() + if key not in seen: + seen.add(key) + out.append(h) + return out + + def _extractor_basic_dom(self, fetch_meta: Dict, issues: List) -> Dict: + _log_entry("_extractor_basic_dom") + if not fetch_meta: + log.error("[WebSword] _extractor_basic_dom | fetch_meta empty") + return { + "title": "", "toc": [], "chunks": [], "links": [], "media": [], + "issues": list(issues) + [{"type": "basic_dom_no_fetch_meta", "detail": "empty"}], + } + html_text = fetch_meta.get("html") or "" + title = _extract_title_basic(html_text) or fetch_meta.get("final_url", fetch_meta.get("url", "")) + cleaned = re.sub(r"(?is)<(script|style|noscript)\b.*?>.*?", " ", html_text) + txt = re.sub(r"\s+", " ", html.unescape(re.sub(r"(?s)<[^>]+>", " ", cleaned))).strip() + chunk_text = txt[:self.valves.MAX_CHUNK_CHARS] + _log_exit("_extractor_basic_dom", f"title={title[:40]} text_len={len(txt)}") + return { + "title": title, + "toc": [], + "chunks": [{"chunk_id": "c1", "heading": title[:120], + "text": chunk_text, "selectors": [], "links": []}], + "links": self._extract_links_basic(html_text), + "media": self._extract_media_basic(html_text), + "issues": list(issues), + } + + def _extractor_readability(self, fetch_meta: Dict, issues: List) -> Dict: + _log_entry("_extractor_readability") + if not fetch_meta: + log.error("[WebSword] _extractor_readability | fetch_meta empty") + return {"issues": list(issues) + [{"type": "readability_no_fetch_meta", "detail": "empty"}]} + + html_text = fetch_meta.get("html") or "" + final_url = fetch_meta.get("final_url") or fetch_meta.get("url") or "" + + try: + import trafilatura + from trafilatura.metadata import extract_metadata + except Exception as e: + log.error("[WebSword] _extractor_readability | trafilatura missing: %s", e) + return {"issues": list(issues) + [{"type": "trafilatura_missing", "detail": repr(e)}]} + + title, lang = "", "" + try: + md = extract_metadata(html_text, url=final_url) + if md: + title = (md.title or "").strip() + lang = (md.language or "").strip() + except Exception as e: + issues = list(issues) + [{"type": "trafilatura_metadata_failed", "detail": repr(e)}] + + try: + extracted = (trafilatura.extract( + html_text, url=final_url, + include_comments=False, include_tables=True, + include_links=False, favor_precision=True, + output_format="txt", + ) or "").strip() + except Exception as e: + log.error("[WebSword] _extractor_readability | extract failed: %s", e) + return {"issues": list(issues) + [{"type": "trafilatura_extract_failed", "detail": repr(e)}]} + + _log_step("_extractor_readability", "extracted", extracted_len=len(extracted)) + + if len(extracted) < int(self.valves.TRAFI_MIN_TEXT_LEN): + return { + "title": title or _extract_title_basic(html_text) or final_url, + "lang": lang, "toc": [], + "chunks": ([{"chunk_id": "c1", "heading": (title or final_url)[:120], + "text": extracted[:self.valves.MAX_CHUNK_CHARS], + "selectors": [], "links": []}] if extracted else []), + "links": self._extract_links_basic(html_text), + "media": self._extract_media_basic(html_text), + "issues": list(issues) + [{"type": "trafilatura_low_text", + "detail": f"len={len(extracted)} Dict: + _log_entry("_sieve", + html_len=len((fetch_meta or {}).get("html") or ""), + intercepted=len((fetch_meta or {}).get("intercepted_json") or []), + has_ax_tree=bool((fetch_meta or {}).get("accessibility_tree"))) + + if not fetch_meta: + log.error("[WebSword] _sieve | fetch_meta empty") + return {"title": "", "toc": [], "chunks": [], "links": [], "media": [], + "issues": list(issues) + [{"type": "sieve_no_fetch_meta", "detail": "empty"}]} + + # Intercept-JSON als Pseudo-HTML injizieren + intercepted = (fetch_meta or {}).get("intercepted_json") or [] + if intercepted and isinstance(fetch_meta, dict): + import json as _json + json_blob = "\n\n".join( + _json.dumps(item.get("body") or "", ensure_ascii=False)[:1200] + for item in intercepted[:10] if item.get("body") + ) + if json_blob: + fetch_meta = dict(fetch_meta) + fetch_meta["html"] = (fetch_meta.get("html") or "") + \ + "\n\n
" + json_blob + "
" + log.debug("[WebSword] _sieve | injected %d JSON blocks", len(intercepted)) + + # Accessibility Tree injizieren + ax_tree = (fetch_meta or {}).get("accessibility_tree") + if ax_tree and isinstance(fetch_meta, dict): + import json as _json + ax_text = _json.dumps(ax_tree, ensure_ascii=False)[:3000] + fetch_meta = dict(fetch_meta) + fetch_meta["html"] = (fetch_meta.get("html") or "") + \ + "\n\n
" + ax_text + "
" + log.debug("[WebSword] _sieve | injected accessibility tree") + + # Kandidaten bewerten + basic = self._extractor_basic_dom(fetch_meta, issues) + basic_text = (basic.get("chunks") or [{}])[0].get("text") or "" if basic.get("chunks") else "" + basic_score = min(1.0, len(basic_text) / 2000.0) + + rd = self._extractor_readability(fetch_meta, issues) + rd_chunks = rd.get("chunks") or [] if isinstance(rd, dict) else [] + rd_text_len = sum(len((c or {}).get("text") or "") for c in rd_chunks) + rd_score = min(1.2, rd_text_len / 3500.0) if rd_text_len else 0.0 + if isinstance(rd, dict) and rd.get("toc"): + rd_score += 0.15 + if self.valves.TRAFILATURA_FAVOR: + rd_score += 0.10 + + name, score, chosen = max( + [("basic_dom", basic_score, basic), ("trafilatura", rd_score, rd)], + key=lambda x: x[1] + ) + _log_step("_sieve", "winner", name=name, score=round(score, 3)) + + if not chosen or not isinstance(chosen, dict) or not chosen.get("chunks"): + log.warning("[WebSword] _sieve | winner '%s' empty, forcing basic_dom", name) + chosen = basic + chosen.setdefault("issues", []) + chosen["issues"].append({"type": "sieve_forced_basic", "detail": "winner_empty"}) + + for i, m in enumerate(chosen.get("media") or [], 1): + m["media_id"] = m.get("media_id") or f"m{i}" + + chosen.setdefault("issues", []) + chosen["issues"].append({"type": "sieve_choice", + "detail": {"name": name, "score": round(score, 3)}}) + _log_exit("_sieve", f"chosen={name} chunks={len(chosen.get('chunks') or [])}") + return chosen + + # ───────────────────────────────────────── + # Layer E: Manifest + # ───────────────────────────────────────── + + def _build_manifest(self, fetch_meta: Dict, issues: List) -> Dict: + _log_entry("_build_manifest") + page = self._sieve(fetch_meta, issues) + site_id = _sha256( + f"{fetch_meta.get('final_url')}|{fetch_meta.get('fetched_at_unix')}|WebSword0.3" + )[:16] + manifest = { + "producer": "WebSword", + "schema_version": "0.3", + "site_id": site_id, + "base_url": fetch_meta.get("final_url", fetch_meta.get("url")), + "fetched_at_unix": fetch_meta.get("fetched_at_unix"), + "pages": [{ + "page_id": "p1", + "url": fetch_meta.get("final_url", fetch_meta.get("url")), + "title": page.get("title", ""), + "lang": page.get("lang", ""), + "toc": page.get("toc", []), + "chunks": page.get("chunks", []), + "links": page.get("links", []), + "media": page.get("media", []), + "issues": page.get("issues", []), + "fetch": { + "mode": fetch_meta.get("mode"), + "status_code": fetch_meta.get("status_code"), + "content_type": fetch_meta.get("content_type", ""), + "intercepted_json_count": len(fetch_meta.get("intercepted_json") or []), + "has_accessibility_tree": bool(fetch_meta.get("accessibility_tree")), + "has_screenshot": bool(fetch_meta.get("screenshot_b64")), + }, + }], + "capabilities": { + "modes_available": list(VALID_MODES), + "playwright_modes_enabled": [ + m for m, v in [ + ("rendered", self.valves.ENABLE_RENDERED_FETCH), + ("stealth", self.valves.ENABLE_STEALTH_SCRAPE), + ("intercept", self.valves.ENABLE_INTERCEPT_FETCH), + ("cdp", self.valves.ENABLE_CDP_FETCH), + ] if v + ], + }, + } + _log_exit("_build_manifest", f"site_id={site_id}") + return manifest + + # ───────────────────────────────────────── + # Public API + # ───────────────────────────────────────── + + async def ws_ingest( + self, + url: str, + mode: str = "auto", + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Seite fetchen, extrahieren und als Manifest speichern. + + mode: auto | static | stealth | intercept | cdp | rendered + + auto - statisch; bei zu wenig Text -> stealth (wenn aktiv) + static - normaler HTTP-Fetch + stealth - Playwright + playwright-stealth (Bot-Schutz umgehen) + intercept - Playwright XHR/JSON-Sniffer (TikTok, Instagram, Reddit ...) + cdp - Playwright + Accessibility Tree (Sites ohne Hyperlinks) + rendered - Playwright ohne stealth (einfache SPAs) + + Playwright-Modi benoetigen das jeweilige ENABLE_*=True Valve + sowie 'playwright install chromium' im Container. + """ + _log_entry("ws_ingest", url=url, mode=mode) + emitter = EventEmitter(__event_emitter__) + + if not self.valves.ENABLE_INGEST: + return "❌ ws_ingest ist deaktiviert." + if not url or not isinstance(url, str): + return "❌ Bitte eine gueltige URL angeben." + + try: + await emitter.status(f"🗡️ WebSword: mode={mode} → {url}") + fetch_meta, issues = self._acquire(url, mode=mode) + _log_step("ws_ingest", "acquire done", + status=fetch_meta.get("status_code"), + html_len=len(fetch_meta.get("html") or ""), + mode=fetch_meta.get("mode")) + + await emitter.status("🧪 Extrahiere Struktur …") + manifest = self._build_manifest(fetch_meta, issues) + site_id = manifest["site_id"] + _WEBSWORD_STORE[site_id] = manifest + page = manifest["pages"][0] + fetch_info = page["fetch"] + + await emitter.status("✅ Ingest fertig.", done=True) + + fetch_line = ( + f"`{fetch_info['mode']}` | HTTP `{fetch_info['status_code']}`" + + (f" | JSON-Responses: {fetch_info['intercepted_json_count']}" + if fetch_info["intercepted_json_count"] else "") + + (" | 🌳 AX-Tree" if fetch_info["has_accessibility_tree"] else "") + + (" | 📸 Screenshot" if fetch_info["has_screenshot"] else "") + ) + + lines = [ + "# 🗡️ WebSword\n", + f"**Site ID:** `{site_id}`", + f"**URL:** {page['url']}", + f"**Title:** {page.get('title', '')}", + f"**Fetch:** {fetch_line}", + "", + f"**Chunks:** {len(page.get('chunks', []))} | " + f"**Media:** {len(page.get('media', []))} | " + f"**Links:** {len(page.get('links', []))}", + "", + "## Next actions", + f"- `ws_map(site_id='{site_id}')`", + f"- `ws_get(site_id='{site_id}', chunk_id='c1')`", + f"- `ws_search(site_id='{site_id}', query='...')`", + f"- `ws_list_media(site_id='{site_id}')`", + "", + "## Available modes", + " `auto` · `static` · `stealth` · `intercept` · `cdp` · `rendered`", + ] + issues_short = (page.get("issues") or [])[:6] + if issues_short: + lines += ["", "## Issues (first 6)"] + for it in issues_short: + lines.append(f"- `{it.get('type')}`: {it.get('detail')}") + + _log_exit("ws_ingest", f"site_id={site_id}") + return "\n".join(lines) + + except Exception as e: + log.exception("[WebSword] ws_ingest | EXCEPTION: %s", e) + msg = f"❌ {type(e).__name__}: {e}" + await emitter.error(msg) + return msg + + async def ws_map( + self, + site_id: str, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Struktur eines gespeicherten Manifests anzeigen.""" + _log_entry("ws_map", site_id=site_id) + emitter = EventEmitter(__event_emitter__) + if not self.valves.ENABLE_MAP: + return "❌ ws_map ist deaktiviert." + manifest = _WEBSWORD_STORE.get(site_id) + if not manifest: + return "❌ Unbekannte site_id — bitte zuerst ws_ingest aufrufen." + + page = manifest["pages"][0] + chunks = page.get("chunks", []) + media = page.get("media", []) + + lines = [ + "# 🧭 WebSword Map\n", + f"**Site ID:** `{site_id}`", + f"**Title:** {page.get('title', '')}", + f"**Fetch mode:** `{page['fetch']['mode']}`", + "", "## Chunks", + ] + for c in chunks[:30]: + lines.append( + f"- `{c.get('chunk_id')}` ({len(c.get('text') or '')} chars)" + f" — {(c.get('heading') or '')[:120]}" + ) + if len(chunks) > 30: + lines.append(f"- … ({len(chunks)-30} more)") + lines += ["", "## Media"] + if not media: + lines.append("- (none)") + else: + for m in media[:30]: + lines.append(f"- `{m.get('media_id')}` **{m.get('type')}** — {(m.get('src') or '')[:100]}") + if len(media) > 30: + lines.append(f"- … ({len(media)-30} more)") + + await emitter.status("✅ Map bereit.", done=True) + _log_exit("ws_map", f"chunks={len(chunks)} media={len(media)}") + return "\n".join(lines) + + async def ws_get( + self, + site_id: str, + chunk_id: str = "c1", + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Einen Chunk aus dem Manifest lesen.""" + _log_entry("ws_get", site_id=site_id, chunk_id=chunk_id) + emitter = EventEmitter(__event_emitter__) + if not self.valves.ENABLE_GET: + return "❌ ws_get ist deaktiviert." + manifest = _WEBSWORD_STORE.get(site_id) + if not manifest: + return "❌ Unbekannte site_id." + page = manifest["pages"][0] + for c in page.get("chunks", []): + if c.get("chunk_id") == chunk_id: + await emitter.status("✅ Chunk geladen.", done=True) + _log_exit("ws_get", f"chunk_id={chunk_id} len={len(c.get('text',''))}") + return f"# 📄 {c.get('heading', '')}\n\n**Chunk ID:** `{chunk_id}`\n\n{c.get('text', '')}" + available = [c.get("chunk_id") for c in page.get("chunks", [])] + log.warning("[WebSword] ws_get | unknown chunk_id=%s available=%s", chunk_id, available) + return f"❌ Unbekannte chunk_id. Verfuegbar: {available}" + + async def ws_search( + self, + site_id: str, + query: str, + limit: int = 8, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Volltextsuche in einem gespeicherten Manifest.""" + _log_entry("ws_search", site_id=site_id, query=query) + emitter = EventEmitter(__event_emitter__) + if not self.valves.ENABLE_SEARCH: + return "❌ ws_search ist deaktiviert." + manifest = _WEBSWORD_STORE.get(site_id) + if not manifest: + return "❌ Unbekannte site_id." + q = (query or "").strip().lower() + if not q: + return "❌ Leere Suchanfrage." + page = manifest["pages"][0] + hits = [] + for c in page.get("chunks", []): + t = c.get("text") or "" + idx = t.lower().find(q) + if idx >= 0: + snippet = t[max(0, idx - 120): idx + 240] + hits.append((c.get("chunk_id"), c.get("heading", ""), snippet)) + if len(hits) >= int(limit): + break + lines = [f"# 🔍 WebSword Search: {query}\n", f"**Site ID:** `{site_id}`\n"] + if not hits: + lines.append("_Keine Treffer._") + else: + for cid, heading, snippet in hits: + lines.append(f"- `{cid}` — **{heading}**") + lines.append(f" > {snippet}…") + await emitter.status("✅ Suche fertig.", done=True) + _log_exit("ws_search", f"hits={len(hits)}") + return "\n".join(lines) + + async def ws_list_media( + self, + site_id: str, + media_type: Optional[str] = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Media-Elemente eines Manifests auflisten.""" + _log_entry("ws_list_media", site_id=site_id, media_type=media_type) + emitter = EventEmitter(__event_emitter__) + if not self.valves.ENABLE_MEDIA: + return "❌ ws_list_media ist deaktiviert." + manifest = _WEBSWORD_STORE.get(site_id) + if not manifest: + return "❌ Unbekannte site_id." + mt = (media_type or "").strip().lower() + media = manifest["pages"][0].get("media", []) + out = [m for m in media if not mt or m.get("type", "").lower() == mt] + lines = [f"# 🖼️ WebSword Media\n", f"**Site ID:** `{site_id}`\n"] + if not out: + lines.append("_Keine Media gefunden._") + else: + for m in out[:60]: + lines.append(f"- `{m.get('media_id')}` **{m.get('type')}** — {(m.get('src') or '')[:100]}") + await emitter.status("✅ Media-Liste bereit.", done=True) + _log_exit("ws_list_media", f"count={len(out)}") + return "\n".join(lines) + + async def ws_embed( + self, + site_id: str, + media_id: str, + mode: str = "auto", + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Media-Element einbetten: Markdown img / iframe / Link.""" + _log_entry("ws_embed", site_id=site_id, media_id=media_id, mode=mode) + emitter = EventEmitter(__event_emitter__) + if not self.valves.ENABLE_MEDIA: + return "❌ ws_embed ist deaktiviert." + manifest = _WEBSWORD_STORE.get(site_id) + if not manifest: + return "❌ Unbekannte site_id." + target = next( + (m for m in manifest["pages"][0].get("media", []) if m.get("media_id") == media_id), + None, + ) + if not target: + return "❌ Unbekannte media_id." + + mtype = (target.get("type") or "").lower() + src = target.get("src") or "" + provider = (target.get("provider") or _safe_domain(src)).lower() + mode = (mode or "auto").lower().strip() + allow = set(d.lower() for d in self.valves.EMBED_ALLOWLIST) + + if mode == "auto": + if mtype == "image": + mode = "img" + elif mtype == "embed" and provider in allow: + mode = "iframe" + else: + mode = "link" + + if mode == "iframe": + if provider not in allow: + await emitter.status("⚠️ Provider nicht erlaubt → Link.", done=True) + title = target.get("title") or target.get("alt") or _safe_domain(src) or src + return f"[{title}]({src})" + snippet = ( + f'' + ) + await emitter.status("✅ Iframe bereit.", done=True) + _log_exit("ws_embed", "iframe") + return snippet + + if mode == "img": + if mtype != "image": + return f"❌ Kein Bild (type={mtype})." + alt = target.get("alt") or target.get("title") or src + await emitter.status("✅ Bild bereit.", done=True) + _log_exit("ws_embed", "img") + return f"![{alt}]({src})" + + # link + title = target.get("title") or target.get("alt") or _safe_domain(src) or src + await emitter.status("✅ Link bereit.", done=True) + _log_exit("ws_embed", "link") + return f"[{title}]({src})" + + async def excalibur( + self, + url: str, + mode: str = "auto", + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Deep-crawl einer ganzen Site (BFS) mit komprimierter Rueckgabe. + + Scannt Startseite + alle Unterseiten der gleichen Domain bis zur + konfigurierten Tiefe. Derselbe mode-Parameter wie ws_ingest wird + auf jede gecrawlte Seite angewendet. + + mode: auto | static | stealth | intercept | cdp + Fuer normale Sites: auto oder static. + Fuer Bot-geschuetzte Sites: stealth. + Fuer XHR-basierte Sites (TikTok etc.): intercept. + Fuer Sites ohne Hyperlinks: cdp. + + Valves: + ENABLE_EXCALIBUR (bool, default=False) — explizit aktivieren + EXCALIBUR_MAX_DEPTH (int, default=2) + EXCALIBUR_MAX_PAGES (int, default=20) + EXCALIBUR_DELAY_S (float, default=0.5) + EXCALIBUR_CHUNK_PREVIEW_CHARS (int, default=500) + EXCALIBUR_TOKEN_WARN_THRESHOLD(int, default=50000) + + Fuer Einzelseiten: ws_ingest() verwenden. + """ + _log_entry("excalibur", url=url, mode=mode) + emitter = EventEmitter(__event_emitter__) + + if not self.valves.ENABLE_EXCALIBUR: + log.warning("[WebSword] excalibur | ENABLE_EXCALIBUR=False") + await emitter.error("Excalibur ist deaktiviert.") + return ( + "⚔️ **Excalibur ist deaktiviert.**\n\n" + "Fuer gezielte Seiten bitte ws_ingest() nutzen:\n" + "- `ws_ingest(url)` — Einzelseite (auto)\n" + "- `ws_ingest(url, mode='stealth')` — Bot-geschuetzte Seite\n" + "- `ws_ingest(url, mode='intercept')` — XHR-basierte Seite\n" + "- `ws_ingest(url, mode='cdp')` — Site ohne Hyperlinks\n\n" + "Aktivieren: Admin-Valves → `ENABLE_EXCALIBUR=True`" + ) + + if not url or not isinstance(url, str): + return "❌ Bitte eine gueltige URL angeben." + base_domain = _safe_domain(url) + if not base_domain: + return "❌ Domain konnte nicht ermittelt werden." + + def _is_same_domain(u: str) -> bool: + return _safe_domain(u) == base_domain + + try: + queue = [(_normalize_url(url), 0)] + visited = {_normalize_url(url)} + pages_data = [] + + await emitter.status( + f"⚔️ Excalibur: {url} | mode={mode} | " + f"depth={self.valves.EXCALIBUR_MAX_DEPTH} " + f"max_pages={self.valves.EXCALIBUR_MAX_PAGES}" + ) + + while queue and len(pages_data) < self.valves.EXCALIBUR_MAX_PAGES: + current_url, depth = queue.pop(0) + _log_step("excalibur", "crawling", url=current_url, depth=depth, done=len(pages_data)) + await emitter.status( + f"⚔️ Seite {len(pages_data)+1} | Tiefe {depth} | {current_url}" + ) + + fetch_meta, issues = self._acquire(current_url, mode=mode) + + # Links fuer BFS sammeln + if depth < self.valves.EXCALIBUR_MAX_DEPTH: + for lnk in self._extract_links_basic(fetch_meta.get("html") or ""): + href = lnk.get("href") or "" + resolved = _normalize_url(_resolve_url(href, current_url)) + if ( + resolved not in visited + and _is_same_domain(resolved) + and resolved.startswith("http") + and len(visited) < self.valves.EXCALIBUR_MAX_PAGES * 3 + ): + visited.add(resolved) + queue.append((resolved, depth + 1)) + + page = self._sieve(fetch_meta, issues) + pages_data.append({ + "url": current_url, + "depth": depth, + "title": page.get("title") or current_url, + "toc": page.get("toc") or [], + "chunks": page.get("chunks") or [], + "media": page.get("media") or [], + "fetch_mode": fetch_meta.get("mode") or "static", + "status_code": fetch_meta.get("status_code") or 0, + }) + + if queue: + time.sleep(self.valves.EXCALIBUR_DELAY_S) + + await emitter.status("⚔️ Excalibur: komprimiere Ergebnis …") + _log_step("excalibur", "crawl done", pages=len(pages_data)) + + preview_chars = int(self.valves.EXCALIBUR_CHUNK_PREVIEW_CHARS) + lines = [ + "# ⚔️ Excalibur Site Map\n", + f"**Base URL:** {url}", + f"**Domain:** {base_domain}", + f"**Mode:** `{mode}`", + f"**Gecrawlt:** {len(pages_data)} Seiten | max Tiefe {self.valves.EXCALIBUR_MAX_DEPTH}", + "", + ] + total_chars = 0 + + for i, pd in enumerate(pages_data, 1): + lines.append(f"---\n## Seite {i}: {pd['title']}") + lines.append( + f"**URL:** {pd['url']} | **Tiefe:** {pd['depth']} | " + f"**HTTP:** {pd['status_code']} | **mode:** `{pd['fetch_mode']}`" + ) + if pd["toc"]: + toc_str = " · ".join(t.get("text", "") for t in pd["toc"][:8]) + lines.append(f"**TOC:** {toc_str}") + if pd["chunks"]: + lines.append("**Inhalt (Vorschau):**") + for c in pd["chunks"]: + text = (c.get("text") or "")[:preview_chars] + if text: + lines.append(f"> {text.replace(chr(10), ' ')}") + total_chars += len(text) + if pd["media"]: + media_str = " · ".join( + f"{m.get('type','?')}:{(m.get('src') or '')[:60]}" + for m in pd["media"][:6] + ) + lines.append(f"**Media:** {media_str}") + if len(pd["media"]) > 6: + lines.append(f" … +{len(pd['media'])-6} weitere") + lines.append("") + + estimated_tokens = total_chars // 4 + lines += [ + "---", + f"**Geschaetzte Token (Inhalt):** ~{estimated_tokens:,}", + ] + if estimated_tokens > self.valves.EXCALIBUR_TOKEN_WARN_THRESHOLD: + lines.append( + f"⚠️ Ueberschreitet Schwellwert " + f"({self.valves.EXCALIBUR_TOKEN_WARN_THRESHOLD:,} Token). " + "Erwaege ws_ingest + ws_get fuer gezieltere Abfragen." + ) + + await emitter.status("✅ Excalibur fertig.", done=True) + _log_exit("excalibur", f"pages={len(pages_data)} ~tokens={estimated_tokens}") + return "\n".join(lines) + + except Exception as e: + log.exception("[WebSword] excalibur | EXCEPTION: %s", e) + msg = f"❌ {type(e).__name__}: {e}" + await emitter.error(msg) + return msg + + async def ws_dump( + self, + site_id: str, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Vollstaendiges Manifest als JSON (Debug). Nicht direkt ans LLM fuettern.""" + _log_entry("ws_dump", site_id=site_id) + emitter = EventEmitter(__event_emitter__) + manifest = _WEBSWORD_STORE.get(site_id) + if not manifest: + return "❌ Unbekannte site_id." + import json + await emitter.status("✅ Dump bereit.", done=True) + _log_exit("ws_dump") + return "```json\n" + json.dumps(manifest, ensure_ascii=False, indent=2) + "\n```"