"""
title: WebSword
description: Universal website ingestion & navigation manifest for LLMs (OWUI internal).
author: Jan + ChatGPT + Claude
version: 0.3-dev
license: MIT
requirements: requests, pydantic, trafilatura, lxml, playwright, playwright-stealth
Changelog:
0.1.0 - initial release
0.1.1 - comprehensive debug logging, NoneType guards
0.2-dev - Markdown media rendering, Excalibur BFS-crawler,
stealth + intercept fetch strategies (stubs)
0.3-dev - MAJOR REFACTOR: unified mode-based API
ws_stealth_scrape / ws_intercept_scrape entfernt
ws_ingest(mode=) ist zentraler Einstiegspunkt:
mode: auto | static | rendered | stealth | intercept | cdp
excalibur(mode=) nutzt denselben mode-Parameter
_acquire() ist zentraler Dispatcher fuer alle Fetch-Strategien
CDP/Accessibility-Tree als Stub vorbereitet (mode='cdp')
Fetch-Strategien (alle ueber mode=):
static - normaler HTTP-Fetch, trafilatura-Extraktion
rendered - Playwright ohne stealth (Stub)
stealth - Playwright + playwright-stealth (Stub)
intercept - Playwright + XHR/JSON-Sniffer (Stub)
cdp - Playwright + Chrome DevTools Protocol / Accessibility Tree (Stub)
auto - statisch; bei zu wenig Text -> stealth als Fallback (wenn aktiv)
Public API:
ws_ingest(url, mode='auto') - Seite fetchen & speichern
ws_map(site_id) - Struktur anzeigen
ws_get(site_id, chunk_id) - Chunk lesen
ws_search(site_id, query) - Volltextsuche
ws_list_media(site_id) - Media auflisten
ws_embed(site_id, media_id) - Media einbetten
excalibur(url, mode='auto') - Deep-crawl einer ganzen Site
ws_dump(site_id) - Debug: vollstaendiges Manifest
"""
import re
import time
import html
import hashlib
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple
import requests
from pydantic import BaseModel, Field
# ─────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger("WebSword")
VALID_MODES = ("auto", "static", "rendered", "stealth", "intercept", "cdp")
def _log_entry(fn: str, **kwargs):
log.debug("[WebSword] ▶ %s | %s", fn, kwargs)
def _log_exit(fn: str, summary: str = ""):
log.debug("[WebSword] ◀ %s | %s", fn, summary)
def _log_step(fn: str, step: str, **kwargs):
log.debug("[WebSword] ⟳ %s | %s | %s", fn, step, kwargs)
# ─────────────────────────────────────────────
# EventEmitter
# ─────────────────────────────────────────────
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def status(self, description: str, done: bool = False):
if self.event_emitter:
await self.event_emitter(
{"type": "status", "data": {"description": description, "done": done}}
)
async def error(self, description: str):
if self.event_emitter:
await self.event_emitter(
{"type": "status", "data": {"description": f"❌ {description}", "done": True}}
)
# ─────────────────────────────────────────────
# Module-level helpers
# ─────────────────────────────────────────────
def _now_unix() -> int:
return int(time.time())
def _sha256(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _truncate_bytes(s: str, max_bytes: int) -> str:
b = s.encode("utf-8", errors="ignore")
return s if len(b) <= max_bytes else b[:max_bytes].decode("utf-8", errors="ignore")
def _basic_visible_text_estimate(html_text: str) -> int:
t = re.sub(r"(?is)<(script|style|noscript)\b.*?>.*?\1>", " ", html_text)
t = re.sub(r"(?s)<[^>]+>", " ", t)
t = html.unescape(t)
return len(re.sub(r"\s+", " ", t).strip())
def _looks_like_needs_render(html_text: str, min_len: int) -> bool:
text_len = _basic_visible_text_estimate(html_text)
needs = text_len < min_len
log.debug("[WebSword] _looks_like_needs_render | visible=%d min=%d -> %s", text_len, min_len, needs)
return needs
def _extract_title_basic(html_text: str) -> str:
m = re.search(r"(?is)
]*>(.*?)", html_text)
if not m:
return ""
return re.sub(r"\s+", " ", html.unescape(m.group(1))).strip()[:200]
def _safe_domain(url: str) -> str:
m = re.match(r"^https?://([^/]+)/?", (url or "").strip())
return m.group(1).lower() if m else ""
def _normalize_url(u: str) -> str:
return u.split("#")[0].rstrip("/")
def _resolve_url(href: str, current: str) -> str:
if href.startswith("http://") or href.startswith("https://"):
return href
if href.startswith("//"):
scheme = current.split("://")[0] if "://" in current else "https"
return f"{scheme}:{href}"
if href.startswith("/"):
m = re.match(r"^(https?://[^/]+)", current)
return f"{m.group(1)}{href}" if m else href
base = current.rsplit("/", 1)[0]
return f"{base}/{href}"
# ─────────────────────────────────────────────
# In-memory store
# ─────────────────────────────────────────────
_WEBSWORD_STORE: Dict[str, Dict[str, Any]] = {}
# ─────────────────────────────────────────────
# Empty fetch_meta factory
# ─────────────────────────────────────────────
def _empty_fetch_meta(url: str, mode: str) -> Dict[str, Any]:
return {
"mode": mode,
"url": url,
"final_url": url,
"status_code": 0,
"content_type": "",
"headers": {},
"html": "",
"intercepted_json": [],
"accessibility_tree": None,
"screenshot_b64": None,
"fetched_at_unix": _now_unix(),
}
# ─────────────────────────────────────────────
# Tools class
# ─────────────────────────────────────────────
class Tools:
class Valves(BaseModel):
# HTTP
USER_AGENT: str = Field(
default="WebSword/0.3 (+open-webui)",
description="User-Agent fuer HTTP-Requests.",
)
TIMEOUT_S: int = Field(default=15, description="HTTP-Timeout in Sekunden.")
MAX_HTML_BYTES: int = Field(
default=2_000_000, description="Max. Bytes vom gefetchten HTML."
)
# Feature flags
ENABLE_INGEST: bool = Field(default=True, description="ws_ingest erlauben.")
ENABLE_MAP: bool = Field(default=True, description="ws_map erlauben.")
ENABLE_GET: bool = Field(default=True, description="ws_get erlauben.")
ENABLE_SEARCH: bool = Field(default=True, description="ws_search erlauben.")
ENABLE_MEDIA: bool = Field(default=True, description="ws_list_media / ws_embed erlauben.")
ENABLE_EXCALIBUR: bool = Field(
default=False,
description="Excalibur Deep-crawl. DEAKTIVIERT by default — kann viele Requests ausloesen.",
)
# Playwright-Modi (alle default=False)
ENABLE_RENDERED_FETCH: bool = Field(
default=False,
description="mode='rendered': Playwright ohne stealth. Benoetigt: playwright install chromium.",
)
ENABLE_STEALTH_SCRAPE: bool = Field(
default=False,
description="mode='stealth': Playwright + playwright-stealth. Benoetigt: playwright install chromium.",
)
ENABLE_INTERCEPT_FETCH: bool = Field(
default=False,
description=(
"mode='intercept': Playwright XHR/JSON-Sniffer. "
"Plattformunabhaengig fuer SPAs (TikTok, Instagram, Reddit ...). "
"Benoetigt: playwright install chromium."
),
)
ENABLE_CDP_FETCH: bool = Field(
default=False,
description=(
"mode='cdp': Playwright + Chrome DevTools Protocol (Accessibility Tree). "
"Macht Sites ohne Hyperlinks navigierbar. "
"Benoetigt: playwright install chromium."
),
)
ENABLE_STEALTH_SCREENSHOT: bool = Field(
default=False,
description="Bei stealth/cdp/intercept: Screenshot als base64-PNG anhaengen.",
)
# Playwright-Timeouts (gemeinsam fuer alle Modi)
PLAYWRIGHT_TIMEOUT_MS: int = Field(
default=20000,
description="Playwright page-load Timeout in ms (gilt fuer alle Playwright-Modi).",
)
PLAYWRIGHT_WAIT_UNTIL: str = Field(
default="networkidle",
description="Playwright wait_until: networkidle | domcontentloaded | load | commit.",
)
# Intercept-spezifisch
INTERCEPT_MAX_RESPONSES: int = Field(
default=20, description="Max. JSON-Responses die der Interceptor sammelt.",
)
INTERCEPT_MIN_JSON_BYTES: int = Field(
default=64, description="Min. Bytes einer JSON-Response (filtert triviale).",
)
# Auto-mode
AUTO_RENDER_MIN_TEXT_LEN: int = Field(
default=500,
description="auto-mode: Wenn sichtbarer Text kuerzer, wird stealth versucht (falls aktiv).",
)
# Extraktion
TRAFILATURA_FAVOR: bool = Field(
default=True, description="trafilatura bevorzugen wenn Qualitaet ausreichend.",
)
TRAFI_MIN_TEXT_LEN: int = Field(
default=800, description="Min. Textlaenge damit trafilatura als erfolgreich gilt.",
)
MAX_CHUNK_CHARS: int = Field(default=8000, description="Max. Zeichen pro Chunk.")
CHUNK_TARGET_CHARS: int = Field(default=2200, description="Ziel-Chunk-Groesse.")
CHUNK_OVERLAP_CHARS: int = Field(default=120, description="Ueberlappung zwischen Chunks.")
MAX_CHUNKS: int = Field(default=12, description="Max. Chunks pro Seite.")
MAX_MEDIA_ITEMS: int = Field(default=40, description="Max. Media-Items.")
MAX_LINK_ITEMS: int = Field(default=60, description="Max. Links.")
# Embed
EMBED_ALLOWLIST: List[str] = Field(
default_factory=lambda: [
"youtube.com", "www.youtube.com", "youtu.be",
"player.vimeo.com", "vimeo.com",
"soundcloud.com", "w.soundcloud.com",
"open.spotify.com",
],
description="Erlaubte iframe-Domains.",
)
# Excalibur
EXCALIBUR_MAX_DEPTH: int = Field(default=2, description="Excalibur: Max. Crawl-Tiefe.")
EXCALIBUR_MAX_PAGES: int = Field(default=20, description="Excalibur: Max. Seiten.")
EXCALIBUR_DELAY_S: float = Field(default=0.5, description="Excalibur: Pause zwischen Requests in Sekunden.")
EXCALIBUR_CHUNK_PREVIEW_CHARS: int = Field(
default=500, description="Excalibur: Max. Zeichen pro Chunk in der komprimierten Ausgabe.",
)
EXCALIBUR_TOKEN_WARN_THRESHOLD: int = Field(
default=50000, description="Excalibur: Warnung wenn geschaetzte Token diesen Wert uebersteigen.",
)
def __init__(self):
self.valves = self.Valves()
# ─────────────────────────────────────────
# Layer A: Fetch-Strategien (intern)
# ─────────────────────────────────────────
def _fetch_static(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
_log_entry("_fetch_static", url=url)
issues = []
headers = {
"User-Agent": self.valves.USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
try:
r = requests.get(url, headers=headers, timeout=self.valves.TIMEOUT_S, allow_redirects=True)
ct = r.headers.get("Content-Type", "")
raw = r.text if r.encoding else r.content.decode("utf-8", errors="ignore")
raw = _truncate_bytes(raw, self.valves.MAX_HTML_BYTES)
_log_step("_fetch_static", "response", status=r.status_code, ct=ct, html_len=len(raw))
if "text/html" not in ct and "application/xhtml" not in ct:
issues.append({"type": "non_html_content_type", "detail": ct})
meta = _empty_fetch_meta(url, "static")
meta.update({
"final_url": str(r.url),
"status_code": int(r.status_code),
"content_type": ct,
"headers": dict(r.headers),
"html": raw,
})
_log_exit("_fetch_static", f"status={r.status_code} len={len(raw)}")
return meta, issues
except Exception as e:
log.exception("[WebSword] _fetch_static | EXCEPTION: %s", e)
issues.append({"type": "fetch_static_failed", "detail": repr(e)})
return _empty_fetch_meta(url, "static"), issues
def _fetch_rendered(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright ohne stealth — Stub v0.3.
TODO(v0.3):
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
await browser.close()
"""
_log_entry("_fetch_rendered", url=url)
issues = []
if not self.valves.ENABLE_RENDERED_FETCH:
issues.append({"type": "rendered_disabled", "detail": "ENABLE_RENDERED_FETCH=False"})
return _empty_fetch_meta(url, "rendered"), issues
issues.append({"type": "rendered_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "rendered"), issues
def _fetch_stealth(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright + playwright-stealth — Stub v0.3.
Umgeht Standard-Headless-Detection, Canvas-Fingerprinting etc.
Grenzen: Cloudflare Turnstile, hCaptcha, harte Login-Walls.
TODO(v0.3):
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
ctx = await browser.new_context(
user_agent=self.valves.USER_AGENT,
viewport={"width": 1280, "height": 800},
locale="de-DE",
)
page = await ctx.new_page()
await stealth_async(page)
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
screenshot_b64 = None
if self.valves.ENABLE_STEALTH_SCREENSHOT:
import base64
screenshot_b64 = base64.b64encode(
await page.screenshot(full_page=True)).decode()
await browser.close()
meta = _empty_fetch_meta(url, "stealth")
meta.update({"html": raw_html, "status_code": resp.status,
"screenshot_b64": screenshot_b64, "final_url": page.url})
"""
_log_entry("_fetch_stealth", url=url)
issues = []
if not self.valves.ENABLE_STEALTH_SCRAPE:
issues.append({"type": "stealth_disabled", "detail": "ENABLE_STEALTH_SCRAPE=False"})
return _empty_fetch_meta(url, "stealth"), issues
issues.append({"type": "stealth_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "stealth"), issues
async def _fetch_intercept(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright + XHR/JSON-Sniffer — Stub v0.3.
Lauscht passiv auf alle JSON-API-Responses waehrend die Seite laedt.
Plattformunabhaengig: TikTok, Instagram, Twitter/X, Reddit, Amazon, ...
TODO(v0.3):
intercepted = []
async def handle_response(response):
ct = response.headers.get("content-type", "")
if "application/json" not in ct:
return
try:
body = await response.body()
if len(body) >= self.valves.INTERCEPT_MIN_JSON_BYTES:
intercepted.append({
"url": response.url,
"status": response.status,
"body": body.decode("utf-8", errors="ignore"),
})
except Exception as e:
log.debug("[WebSword] intercept response error: %s", e)
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
await stealth_async(page)
page.on("response", handle_response)
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
await browser.close()
meta = _empty_fetch_meta(url, "intercept")
meta.update({"html": raw_html, "status_code": resp.status if resp else 0,
"intercepted_json": intercepted[:self.valves.INTERCEPT_MAX_RESPONSES],
"final_url": page.url})
"""
_log_entry("_fetch_intercept", url=url)
issues = []
if not self.valves.ENABLE_INTERCEPT_FETCH:
issues.append({"type": "intercept_disabled", "detail": "ENABLE_INTERCEPT_FETCH=False"})
return _empty_fetch_meta(url, "intercept"), issues
issues.append({"type": "intercept_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "intercept"), issues
def _fetch_cdp(self, url: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Playwright + Chrome DevTools Protocol — Stub v0.3.
Macht Sites ohne Hyperlinks navigierbar:
- Accessibility Tree: semantische Struktur unabhaengig vom visuellen DOM
- Alle interaktiven Elemente (Buttons, ARIA-Roles, onClick-Handler)
- Shadow DOM durchdringen
- Kein klassisches Hyperlink-System noetig
TODO(v0.3):
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
await stealth_async(page)
client = await page.context.new_cdp_session(page)
resp = await page.goto(url,
wait_until=self.valves.PLAYWRIGHT_WAIT_UNTIL,
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS)
raw_html = await page.content()
# Accessibility Tree
ax_tree = await client.send("Accessibility.getFullAXTree")
# Alle interaktiven Elemente inventarisieren
clickables = await page.query_selector_all(
"button, [role='button'], [role='link'], [role='menuitem'], "
"[tabindex], [onclick], a, input[type='submit']"
)
interactive = []
for el in clickables:
try:
label = await el.get_attribute("aria-label") or await el.inner_text()
box = await el.bounding_box()
interactive.append({
"label": (label or "").strip()[:80],
"visible": box is not None,
"tag": await el.evaluate("e => e.tagName"),
})
except Exception:
pass
screenshot_b64 = None
if self.valves.ENABLE_STEALTH_SCREENSHOT:
import base64
screenshot_b64 = base64.b64encode(
await page.screenshot(full_page=True)).decode()
await browser.close()
meta = _empty_fetch_meta(url, "cdp")
meta.update({
"html": raw_html,
"status_code": resp.status if resp else 0,
"final_url": page.url,
"accessibility_tree": ax_tree,
"interactive_elements": interactive,
"screenshot_b64": screenshot_b64,
})
"""
_log_entry("_fetch_cdp", url=url)
issues = []
if not self.valves.ENABLE_CDP_FETCH:
issues.append({"type": "cdp_disabled", "detail": "ENABLE_CDP_FETCH=False"})
return _empty_fetch_meta(url, "cdp"), issues
issues.append({"type": "cdp_not_implemented", "detail": "stub v0.3"})
return _empty_fetch_meta(url, "cdp"), issues
# ─────────────────────────────────────────
# Layer B: _acquire() — zentraler Dispatcher
# ─────────────────────────────────────────
def _acquire(self, url: str, mode: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Zentraler Fetch-Dispatcher fuer alle Strategien.
mode: auto | static | rendered | stealth | intercept | cdp
Fallback-Ketten:
auto -> static; bei zu wenig Text -> stealth (wenn aktiv) -> static
rendered -> static (wenn rendered leer)
stealth -> static (wenn stealth leer)
intercept -> stealth (wenn aktiv) -> static
cdp -> stealth (wenn aktiv) -> static
"""
_log_entry("_acquire", url=url, mode=mode)
mode = (mode or "auto").lower().strip()
if mode not in VALID_MODES:
log.warning("[WebSword] _acquire | unknown mode '%s', using auto", mode)
mode = "auto"
def _fallback_static(reason: str, prior_issues: list) -> Tuple[Dict, List]:
log.warning("[WebSword] _acquire | fallback to static: %s", reason)
meta, issues = self._fetch_static(url)
issues = prior_issues + issues
issues.append({"type": "acquire_fallback_static", "detail": reason})
return meta, issues
def _fallback_stealth_then_static(reason: str, prior_issues: list) -> Tuple[Dict, List]:
if self.valves.ENABLE_STEALTH_SCRAPE:
log.warning("[WebSword] _acquire | fallback to stealth: %s", reason)
meta, issues = self._fetch_stealth(url)
issues = prior_issues + issues
issues.append({"type": "acquire_fallback_stealth", "detail": reason})
if (meta.get("html") or "").strip():
return meta, issues
return _fallback_static(reason + "_then_static", prior_issues)
# static
if mode == "static":
meta, issues = self._fetch_static(url)
issues.append({"type": "acquire_mode", "detail": "static"})
_log_exit("_acquire", "static")
return meta, issues
# rendered
if mode == "rendered":
meta, issues = self._fetch_rendered(url)
if not (meta.get("html") or "").strip():
return _fallback_static("rendered_empty", issues)
issues.append({"type": "acquire_mode", "detail": "rendered"})
_log_exit("_acquire", "rendered")
return meta, issues
# stealth
if mode == "stealth":
meta, issues = self._fetch_stealth(url)
if not (meta.get("html") or "").strip():
return _fallback_static("stealth_empty", issues)
issues.append({"type": "acquire_mode", "detail": "stealth"})
_log_exit("_acquire", "stealth")
return meta, issues
# intercept
if mode == "intercept":
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, self._fetch_intercept(url))
meta, issues = future.result(
timeout=self.valves.PLAYWRIGHT_TIMEOUT_MS / 1000 + 5)
else:
meta, issues = loop.run_until_complete(self._fetch_intercept(url))
except Exception as e:
log.error("[WebSword] _acquire | intercept call failed: %s", e)
meta = _empty_fetch_meta(url, "intercept")
issues = [{"type": "intercept_call_failed", "detail": repr(e)}]
if not (meta.get("html") or "").strip() and not meta.get("intercepted_json"):
return _fallback_stealth_then_static("intercept_empty", issues)
issues.append({"type": "acquire_mode", "detail": "intercept"})
_log_exit("_acquire", "intercept")
return meta, issues
# cdp
if mode == "cdp":
meta, issues = self._fetch_cdp(url)
if not (meta.get("html") or "").strip():
return _fallback_stealth_then_static("cdp_empty", issues)
issues.append({"type": "acquire_mode", "detail": "cdp"})
_log_exit("_acquire", "cdp")
return meta, issues
# auto
meta, issues = self._fetch_static(url)
_log_step("_acquire", "auto static done",
status=meta.get("status_code"), html_len=len(meta.get("html") or ""))
if meta.get("status_code") in (401, 403):
issues.append({"type": "access_restricted", "detail": f"status={meta['status_code']}"})
html_text = meta.get("html") or ""
if _looks_like_needs_render(html_text, self.valves.AUTO_RENDER_MIN_TEXT_LEN):
if self.valves.ENABLE_STEALTH_SCRAPE:
log.debug("[WebSword] _acquire | auto: static insufficient, trying stealth")
s_meta, s_issues = self._fetch_stealth(url)
if (s_meta.get("html") or "").strip():
s_issues.extend(issues)
s_issues.append({"type": "auto_used_stealth", "detail": "static_insufficient"})
_log_exit("_acquire", "auto->stealth")
return s_meta, s_issues
issues.append({"type": "auto_used_static", "detail": "stealth_inactive_or_failed"})
else:
issues.append({"type": "auto_used_static", "detail": "sufficient_text"})
_log_exit("_acquire", "auto->static")
return meta, issues
# ─────────────────────────────────────────
# Layer C: Extraktion
# ─────────────────────────────────────────
def _extract_links_basic(self, html_text: str) -> List[Dict[str, str]]:
out: List[Dict[str, str]] = []
for m in re.finditer(r'(?is)]*href=["\']([^"\']+)["\']', html_text or ""):
href = (m.group(1) or "").strip()
if not href or href.startswith("#") or href.lower().startswith("javascript:"):
continue
out.append({"href": href})
if len(out) >= self.valves.MAX_LINK_ITEMS:
break
return out
def _extract_media_basic(self, html_text: str) -> List[Dict[str, Any]]:
media: List[Dict[str, Any]] = []
for m in re.finditer(
r'(?is)
]*src=["\']([^"\']+)["\']([^>]*)>', html_text or ""
):
src = (m.group(1) or "").strip()
rest = m.group(2) or ""
alt_m = re.search(r'(?is)\balt=["\']([^"\']*)["\']', rest)
alt = (alt_m.group(1) if alt_m else "").strip()
media.append({"media_id": None, "type": "image", "src": src, "alt": alt})
if len(media) >= self.valves.MAX_MEDIA_ITEMS:
break
if len(media) < self.valves.MAX_MEDIA_ITEMS:
for m in re.finditer(
r'(?is)