from __future__ import annotations import contextlib import csv import html import importlib.util import io import os import re import sys import time from collections import deque from pathlib import Path import streamlit as st import streamlit.components.v1 as components ROOT_DIR = Path(__file__).resolve().parent PAGE_IMPORTER_DIR = ROOT_DIR / "Page Importer" SITEMAP_BUILDER_PATH = ROOT_DIR / "Sitemap Builder" / "sitemap_builder.py" APP_DATA_DIR = Path(os.environ.get("APP_DATA_DIR", ROOT_DIR / ".data")).resolve() SITEMAP_OUTPUT_DIR = APP_DATA_DIR / "sitemaps" def load_module(module_name: str, file_path: Path): spec = importlib.util.spec_from_file_location(module_name, file_path) if spec is None or spec.loader is None: raise RuntimeError(f"Unable to load module from {file_path}") module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) return module def get_page_importer_module(): if str(PAGE_IMPORTER_DIR) not in sys.path: sys.path.insert(0, str(PAGE_IMPORTER_DIR)) return load_module("page_importer_streamlit", PAGE_IMPORTER_DIR / "app.py") def get_sitemap_module(): return load_module("sitemap_builder_module", SITEMAP_BUILDER_PATH) def sanitize_job_name(value: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", (value or "").strip()) cleaned = cleaned.strip(".-") return cleaned or "sitemap" def read_csv_preview(csv_bytes: bytes, limit: int = 200) -> list[dict[str, str]]: text = csv_bytes.decode("utf-8-sig", errors="replace") reader = csv.DictReader(io.StringIO(text)) rows: list[dict[str, str]] = [] for index, row in enumerate(reader): if index >= limit: break rows.append(dict(row)) return rows class StreamlitOutputBuffer(io.TextIOBase): def __init__(self, placeholder, *, height: int = 220, throttle_seconds: float = 0.2, max_lines: int = 20) -> None: self.placeholder = placeholder self.height = height self.throttle_seconds = throttle_seconds self.max_lines = max_lines self.lines: deque[str] = deque(maxlen=max_lines) self.current_line = "" self.last_render = 0.0 def write(self, text: str) -> int: if not text: return 0 normalized = text.replace("\r\n", "\n").replace("\r", "\n") for chunk in normalized.splitlines(keepends=True): if chunk.endswith("\n"): self.current_line += chunk[:-1] self.lines.append(self.current_line) self.current_line = "" else: self.current_line += chunk now = time.monotonic() if "\n" in text or (now - self.last_render) >= self.throttle_seconds: self.render() return len(text) def flush(self) -> None: self.render() def render(self) -> None: self.last_render = time.monotonic() visible_lines = list(self.lines) if self.current_line: visible_lines.append(self.current_line) visible_lines = visible_lines[-self.max_lines :] content = html.escape("\n".join(visible_lines)) self.placeholder.caption("Scan Details") components.html( f"""
{content}
""", height=self.height + 16, ) def getvalue(self) -> str: visible_lines = list(self.lines) if self.current_line: visible_lines.append(self.current_line) return "\n".join(visible_lines[-self.max_lines :]) def render_sitemap_tab() -> None: st.title("Sitemap Generator") st.caption("Crawl a site, export a sitemap CSV, and keep resume data inside the container data volume.") SITEMAP_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) sitemap_builder = get_sitemap_module() default_workers = sitemap_builder.DEFAULT_WORKERS with st.form("sitemap-form"): start_url = st.text_input("Starting URL", placeholder="https://example.com") job_name = st.text_input( "Output name", value="sitemap", help="Used for the CSV, crawl state, and log file names.", ) col1, col2, col3 = st.columns(3) with col1: max_pages = st.number_input("Max pages", min_value=1, value=10000, step=100) workers = st.number_input( "Worker threads", min_value=1, value=default_workers, step=1, help="Defaults to the number of CPUs visible inside the Docker container.", ) with col2: delay = st.number_input("Delay between requests (seconds)", min_value=0.0, value=0.0, step=0.25) timeout = st.number_input("Request timeout (seconds)", min_value=1.0, value=15.0, step=1.0) with col3: save_every = st.number_input("Save progress every N pages", min_value=1, value=25, step=1) include_subdomains = st.checkbox("Include subdomains", value=False) include_documents = st.checkbox("Include document links", value=False) resume_existing = st.checkbox("Resume from saved crawl state if present", value=True) start_fresh = st.checkbox("Ignore any saved crawl state and start fresh", value=False) submitted = st.form_submit_button("Run Sitemap Crawl", type="primary") if submitted: if not start_url.strip(): st.error("Starting URL is required.") else: safe_name = sanitize_job_name(job_name) output_path = SITEMAP_OUTPUT_DIR / f"{safe_name}.csv" output_placeholder = st.empty() captured_stdout = StreamlitOutputBuffer(output_placeholder) captured_stdout.render() try: with st.spinner("Running sitemap crawl..."): with contextlib.redirect_stdout(captured_stdout): result = sitemap_builder.run_crawl( start_url=start_url, output_path=output_path, max_pages=int(max_pages), delay=float(delay), timeout=float(timeout), include_subdomains=include_subdomains, include_documents=include_documents, save_every=int(save_every), workers=int(workers), resume=resume_existing, fresh=start_fresh, ) captured_stdout.flush() except Exception as exc: captured_stdout.flush() st.error(str(exc)) else: st.session_state["sitemap_result"] = { "summary": { "records": len(result.state.records), "visited": len(result.state.visited), "queued": len(result.state.queue), "errors": len(result.state.errors), "skipped": result.state.skipped_count, "from_sitemaps": result.state.discovered_from_sitemaps, "user_stopped": result.user_stopped, "max_pages": result.max_pages, "workers": result.workers, }, "output_path": str(result.output_path), "state_path": str(result.state_path), "log_path": str(result.log_path), "stdout": captured_stdout.getvalue(), } result_data = st.session_state.get("sitemap_result") if not result_data: st.info("Run a crawl to generate a sitemap CSV.") return summary = result_data["summary"] csv_path = Path(result_data["output_path"]) state_path = Path(result_data["state_path"]) log_path = Path(result_data["log_path"]) st.subheader("Crawl Summary") metric_cols = st.columns(6) metric_cols[0].metric("URLs Found", summary["records"]) metric_cols[1].metric("Visited", summary["visited"]) metric_cols[2].metric("Queued", summary["queued"]) metric_cols[3].metric("XML Seeded", summary["from_sitemaps"]) metric_cols[4].metric("Errors", summary["errors"]) metric_cols[5].metric("Skipped", summary["skipped"]) status_text = "Stopped by user." if summary["user_stopped"] else "Run completed." st.caption(f"{status_text} Max pages used: {summary['max_pages']} | Worker threads: {summary['workers']}") if csv_path.exists(): csv_bytes = csv_path.read_bytes() st.download_button( "Download Sitemap CSV", data=csv_bytes, file_name=csv_path.name, mime="text/csv", ) preview_rows = read_csv_preview(csv_bytes) if preview_rows: st.dataframe(preview_rows, width="stretch", hide_index=True) file_cols = st.columns(2) with file_cols[0]: if state_path.exists(): st.download_button( "Download Crawl State", data=state_path.read_bytes(), file_name=state_path.name, mime="application/json", ) with file_cols[1]: if log_path.exists(): st.download_button( "Download Crawl Log", data=log_path.read_bytes(), file_name=log_path.name, mime="text/plain", ) cleanup_targets = [path for path in (csv_path, state_path, log_path) if path.exists()] if cleanup_targets: st.caption("Cleanup removes the sitemap CSV, crawl state, and crawl log for this run.") if st.button("Delete Crawl Files"): removed_paths = sitemap_builder.cleanup_run_files(csv_path) st.session_state.pop("sitemap_result", None) if removed_paths: removed_names = ", ".join(path.name for path in removed_paths) st.success(f"Deleted: {removed_names}") else: st.info("No crawl files were present to delete.") return crawl_output = (result_data.get("stdout") or "").strip() if crawl_output: st.text_area("Scan Details", value=crawl_output, height=220, disabled=True) if log_path.exists(): log_text = log_path.read_text(encoding="utf-8", errors="replace") st.text_area("Log Tail", value="\n".join(log_text.splitlines()[-50:]), height=220, disabled=True) def main() -> None: st.set_page_config(page_title="WDW Tools", layout="wide") st.header("WDW Sitemap And Import Tools") selected_tool = st.radio( "Tool", ["Sitemap Generator", "Page Importer"], horizontal=True, label_visibility="collapsed", ) if selected_tool == "Sitemap Generator": render_sitemap_tab() else: page_importer_app = get_page_importer_module() page_importer_app.render_app() if __name__ == "__main__": main()