Files
wdwalrus 9e87bc4453
Build Docker Image / docker (push) Successful in 6s
Fix UI issue
2026-04-09 11:35:04 -07:00

311 lines
12 KiB
Python

from __future__ import annotations
import contextlib
import csv
import html
import importlib.util
import io
import os
import re
import sys
import time
from collections import deque
from pathlib import Path
import streamlit as st
import streamlit.components.v1 as components
ROOT_DIR = Path(__file__).resolve().parent
PAGE_IMPORTER_DIR = ROOT_DIR / "Page Importer"
SITEMAP_BUILDER_PATH = ROOT_DIR / "Sitemap Builder" / "sitemap_builder.py"
APP_DATA_DIR = Path(os.environ.get("APP_DATA_DIR", ROOT_DIR / ".data")).resolve()
SITEMAP_OUTPUT_DIR = APP_DATA_DIR / "sitemaps"
def load_module(module_name: str, file_path: Path):
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load module from {file_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
def get_page_importer_module():
if str(PAGE_IMPORTER_DIR) not in sys.path:
sys.path.insert(0, str(PAGE_IMPORTER_DIR))
return load_module("page_importer_streamlit", PAGE_IMPORTER_DIR / "app.py")
def get_sitemap_module():
return load_module("sitemap_builder_module", SITEMAP_BUILDER_PATH)
def sanitize_job_name(value: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", (value or "").strip())
cleaned = cleaned.strip(".-")
return cleaned or "sitemap"
def read_csv_preview(csv_bytes: bytes, limit: int = 200) -> list[dict[str, str]]:
text = csv_bytes.decode("utf-8-sig", errors="replace")
reader = csv.DictReader(io.StringIO(text))
rows: list[dict[str, str]] = []
for index, row in enumerate(reader):
if index >= limit:
break
rows.append(dict(row))
return rows
class StreamlitOutputBuffer(io.TextIOBase):
def __init__(self, placeholder, *, height: int = 220, throttle_seconds: float = 0.2, max_lines: int = 20) -> None:
self.placeholder = placeholder
self.height = height
self.throttle_seconds = throttle_seconds
self.max_lines = max_lines
self.lines: deque[str] = deque(maxlen=max_lines)
self.current_line = ""
self.last_render = 0.0
def write(self, text: str) -> int:
if not text:
return 0
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
for chunk in normalized.splitlines(keepends=True):
if chunk.endswith("\n"):
self.current_line += chunk[:-1]
self.lines.append(self.current_line)
self.current_line = ""
else:
self.current_line += chunk
now = time.monotonic()
if "\n" in text or (now - self.last_render) >= self.throttle_seconds:
self.render()
return len(text)
def flush(self) -> None:
self.render()
def render(self) -> None:
self.last_render = time.monotonic()
visible_lines = list(self.lines)
if self.current_line:
visible_lines.append(self.current_line)
visible_lines = visible_lines[-self.max_lines :]
content = html.escape("\n".join(visible_lines))
with self.placeholder.container():
st.caption("Scan Details")
components.html(
f"""
<div id="scan-details" style="
height: {self.height}px;
overflow-y: auto;
white-space: pre-wrap;
font-family: monospace;
font-size: 0.9rem;
padding: 0.75rem;
border: 1px solid rgba(49, 51, 63, 0.2);
border-radius: 0.5rem;
background: white;
">{content}</div>
<script>
const el = document.getElementById("scan-details");
if (el) {{
el.scrollTop = el.scrollHeight;
}}
</script>
""",
height=self.height + 16,
)
def getvalue(self) -> str:
visible_lines = list(self.lines)
if self.current_line:
visible_lines.append(self.current_line)
return "\n".join(visible_lines[-self.max_lines :])
def render_sitemap_tab() -> None:
st.title("Sitemap Generator")
st.caption("Crawl a site, export a sitemap CSV, and keep resume data inside the container data volume.")
SITEMAP_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
sitemap_builder = get_sitemap_module()
default_workers = sitemap_builder.DEFAULT_WORKERS
with st.form("sitemap-form"):
start_url = st.text_input("Starting URL", placeholder="https://example.com")
job_name = st.text_input(
"Output name",
value="sitemap",
help="Used for the CSV, crawl state, and log file names.",
)
col1, col2, col3 = st.columns(3)
with col1:
max_pages = st.number_input("Max pages", min_value=1, value=10000, step=100)
workers = st.number_input(
"Worker threads",
min_value=1,
value=default_workers,
step=1,
help="Defaults to the number of CPUs visible inside the Docker container.",
)
with col2:
delay = st.number_input("Delay between requests (seconds)", min_value=0.0, value=0.0, step=0.25)
timeout = st.number_input("Request timeout (seconds)", min_value=1.0, value=15.0, step=1.0)
with col3:
save_every = st.number_input("Save progress every N pages", min_value=1, value=25, step=1)
include_subdomains = st.checkbox("Include subdomains", value=False)
include_documents = st.checkbox("Include document links", value=False)
resume_existing = st.checkbox("Resume from saved crawl state if present", value=True)
start_fresh = st.checkbox("Ignore any saved crawl state and start fresh", value=False)
submitted = st.form_submit_button("Run Sitemap Crawl", type="primary")
if submitted:
if not start_url.strip():
st.error("Starting URL is required.")
else:
safe_name = sanitize_job_name(job_name)
output_path = SITEMAP_OUTPUT_DIR / f"{safe_name}.csv"
output_placeholder = st.empty()
captured_stdout = StreamlitOutputBuffer(output_placeholder)
captured_stdout.render()
try:
with st.spinner("Running sitemap crawl..."):
with contextlib.redirect_stdout(captured_stdout):
result = sitemap_builder.run_crawl(
start_url=start_url,
output_path=output_path,
max_pages=int(max_pages),
delay=float(delay),
timeout=float(timeout),
include_subdomains=include_subdomains,
include_documents=include_documents,
save_every=int(save_every),
workers=int(workers),
resume=resume_existing,
fresh=start_fresh,
)
captured_stdout.flush()
except Exception as exc:
captured_stdout.flush()
st.error(str(exc))
else:
st.session_state["sitemap_result"] = {
"summary": {
"records": len(result.state.records),
"visited": len(result.state.visited),
"queued": len(result.state.queue),
"errors": len(result.state.errors),
"skipped": result.state.skipped_count,
"from_sitemaps": result.state.discovered_from_sitemaps,
"user_stopped": result.user_stopped,
"max_pages": result.max_pages,
"workers": result.workers,
},
"output_path": str(result.output_path),
"state_path": str(result.state_path),
"log_path": str(result.log_path),
"stdout": captured_stdout.getvalue(),
}
result_data = st.session_state.get("sitemap_result")
if not result_data:
st.info("Run a crawl to generate a sitemap CSV.")
return
summary = result_data["summary"]
csv_path = Path(result_data["output_path"])
state_path = Path(result_data["state_path"])
log_path = Path(result_data["log_path"])
st.subheader("Crawl Summary")
metric_cols = st.columns(6)
metric_cols[0].metric("URLs Found", summary["records"])
metric_cols[1].metric("Visited", summary["visited"])
metric_cols[2].metric("Queued", summary["queued"])
metric_cols[3].metric("XML Seeded", summary["from_sitemaps"])
metric_cols[4].metric("Errors", summary["errors"])
metric_cols[5].metric("Skipped", summary["skipped"])
status_text = "Stopped by user." if summary["user_stopped"] else "Run completed."
st.caption(f"{status_text} Max pages used: {summary['max_pages']} | Worker threads: {summary['workers']}")
if csv_path.exists():
csv_bytes = csv_path.read_bytes()
st.download_button(
"Download Sitemap CSV",
data=csv_bytes,
file_name=csv_path.name,
mime="text/csv",
)
preview_rows = read_csv_preview(csv_bytes)
if preview_rows:
st.dataframe(preview_rows, width="stretch", hide_index=True)
file_cols = st.columns(2)
with file_cols[0]:
if state_path.exists():
st.download_button(
"Download Crawl State",
data=state_path.read_bytes(),
file_name=state_path.name,
mime="application/json",
)
with file_cols[1]:
if log_path.exists():
st.download_button(
"Download Crawl Log",
data=log_path.read_bytes(),
file_name=log_path.name,
mime="text/plain",
)
cleanup_targets = [path for path in (csv_path, state_path, log_path) if path.exists()]
if cleanup_targets:
st.caption("Cleanup removes the sitemap CSV, crawl state, and crawl log for this run.")
if st.button("Delete Crawl Files"):
removed_paths = sitemap_builder.cleanup_run_files(csv_path)
st.session_state.pop("sitemap_result", None)
if removed_paths:
removed_names = ", ".join(path.name for path in removed_paths)
st.success(f"Deleted: {removed_names}")
else:
st.info("No crawl files were present to delete.")
return
crawl_output = (result_data.get("stdout") or "").strip()
if crawl_output:
st.text_area("Scan Details", value=crawl_output, height=220, disabled=True)
if log_path.exists():
log_text = log_path.read_text(encoding="utf-8", errors="replace")
st.text_area("Log Tail", value="\n".join(log_text.splitlines()[-50:]), height=220, disabled=True)
def main() -> None:
st.set_page_config(page_title="WDW Tools", layout="wide")
st.header("WDW Sitemap And Import Tools")
selected_tool = st.radio(
"Tool",
["Sitemap Generator", "Page Importer"],
horizontal=True,
label_visibility="collapsed",
)
if selected_tool == "Sitemap Generator":
render_sitemap_tab()
else:
page_importer_app = get_page_importer_module()
page_importer_app.render_app()
if __name__ == "__main__":
main()