311 lines
12 KiB
Python
311 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import csv
|
|
import html
|
|
import importlib.util
|
|
import io
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from collections import deque
|
|
from pathlib import Path
|
|
|
|
import streamlit as st
|
|
import streamlit.components.v1 as components
|
|
|
|
|
|
ROOT_DIR = Path(__file__).resolve().parent
|
|
PAGE_IMPORTER_DIR = ROOT_DIR / "Page Importer"
|
|
SITEMAP_BUILDER_PATH = ROOT_DIR / "Sitemap Builder" / "sitemap_builder.py"
|
|
APP_DATA_DIR = Path(os.environ.get("APP_DATA_DIR", ROOT_DIR / ".data")).resolve()
|
|
SITEMAP_OUTPUT_DIR = APP_DATA_DIR / "sitemaps"
|
|
|
|
|
|
def load_module(module_name: str, file_path: Path):
|
|
spec = importlib.util.spec_from_file_location(module_name, file_path)
|
|
if spec is None or spec.loader is None:
|
|
raise RuntimeError(f"Unable to load module from {file_path}")
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[module_name] = module
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
def get_page_importer_module():
|
|
if str(PAGE_IMPORTER_DIR) not in sys.path:
|
|
sys.path.insert(0, str(PAGE_IMPORTER_DIR))
|
|
return load_module("page_importer_streamlit", PAGE_IMPORTER_DIR / "app.py")
|
|
|
|
|
|
def get_sitemap_module():
|
|
return load_module("sitemap_builder_module", SITEMAP_BUILDER_PATH)
|
|
|
|
|
|
def sanitize_job_name(value: str) -> str:
|
|
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", (value or "").strip())
|
|
cleaned = cleaned.strip(".-")
|
|
return cleaned or "sitemap"
|
|
|
|
|
|
def read_csv_preview(csv_bytes: bytes, limit: int = 200) -> list[dict[str, str]]:
|
|
text = csv_bytes.decode("utf-8-sig", errors="replace")
|
|
reader = csv.DictReader(io.StringIO(text))
|
|
rows: list[dict[str, str]] = []
|
|
for index, row in enumerate(reader):
|
|
if index >= limit:
|
|
break
|
|
rows.append(dict(row))
|
|
return rows
|
|
|
|
|
|
class StreamlitOutputBuffer(io.TextIOBase):
|
|
def __init__(self, placeholder, *, height: int = 220, throttle_seconds: float = 0.2, max_lines: int = 20) -> None:
|
|
self.placeholder = placeholder
|
|
self.height = height
|
|
self.throttle_seconds = throttle_seconds
|
|
self.max_lines = max_lines
|
|
self.lines: deque[str] = deque(maxlen=max_lines)
|
|
self.current_line = ""
|
|
self.last_render = 0.0
|
|
|
|
def write(self, text: str) -> int:
|
|
if not text:
|
|
return 0
|
|
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
|
|
for chunk in normalized.splitlines(keepends=True):
|
|
if chunk.endswith("\n"):
|
|
self.current_line += chunk[:-1]
|
|
self.lines.append(self.current_line)
|
|
self.current_line = ""
|
|
else:
|
|
self.current_line += chunk
|
|
now = time.monotonic()
|
|
if "\n" in text or (now - self.last_render) >= self.throttle_seconds:
|
|
self.render()
|
|
return len(text)
|
|
|
|
def flush(self) -> None:
|
|
self.render()
|
|
|
|
def render(self) -> None:
|
|
self.last_render = time.monotonic()
|
|
visible_lines = list(self.lines)
|
|
if self.current_line:
|
|
visible_lines.append(self.current_line)
|
|
visible_lines = visible_lines[-self.max_lines :]
|
|
content = html.escape("\n".join(visible_lines))
|
|
with self.placeholder.container():
|
|
st.caption("Scan Details")
|
|
components.html(
|
|
f"""
|
|
<div id="scan-details" style="
|
|
height: {self.height}px;
|
|
overflow-y: auto;
|
|
white-space: pre-wrap;
|
|
font-family: monospace;
|
|
font-size: 0.9rem;
|
|
padding: 0.75rem;
|
|
border: 1px solid rgba(49, 51, 63, 0.2);
|
|
border-radius: 0.5rem;
|
|
background: white;
|
|
">{content}</div>
|
|
<script>
|
|
const el = document.getElementById("scan-details");
|
|
if (el) {{
|
|
el.scrollTop = el.scrollHeight;
|
|
}}
|
|
</script>
|
|
""",
|
|
height=self.height + 16,
|
|
)
|
|
|
|
def getvalue(self) -> str:
|
|
visible_lines = list(self.lines)
|
|
if self.current_line:
|
|
visible_lines.append(self.current_line)
|
|
return "\n".join(visible_lines[-self.max_lines :])
|
|
|
|
|
|
def render_sitemap_tab() -> None:
|
|
st.title("Sitemap Generator")
|
|
st.caption("Crawl a site, export a sitemap CSV, and keep resume data inside the container data volume.")
|
|
|
|
SITEMAP_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
sitemap_builder = get_sitemap_module()
|
|
default_workers = sitemap_builder.DEFAULT_WORKERS
|
|
|
|
with st.form("sitemap-form"):
|
|
start_url = st.text_input("Starting URL", placeholder="https://example.com")
|
|
job_name = st.text_input(
|
|
"Output name",
|
|
value="sitemap",
|
|
help="Used for the CSV, crawl state, and log file names.",
|
|
)
|
|
|
|
col1, col2, col3 = st.columns(3)
|
|
with col1:
|
|
max_pages = st.number_input("Max pages", min_value=1, value=10000, step=100)
|
|
workers = st.number_input(
|
|
"Worker threads",
|
|
min_value=1,
|
|
value=default_workers,
|
|
step=1,
|
|
help="Defaults to the number of CPUs visible inside the Docker container.",
|
|
)
|
|
with col2:
|
|
delay = st.number_input("Delay between requests (seconds)", min_value=0.0, value=0.0, step=0.25)
|
|
timeout = st.number_input("Request timeout (seconds)", min_value=1.0, value=15.0, step=1.0)
|
|
with col3:
|
|
save_every = st.number_input("Save progress every N pages", min_value=1, value=25, step=1)
|
|
include_subdomains = st.checkbox("Include subdomains", value=False)
|
|
include_documents = st.checkbox("Include document links", value=False)
|
|
|
|
resume_existing = st.checkbox("Resume from saved crawl state if present", value=True)
|
|
start_fresh = st.checkbox("Ignore any saved crawl state and start fresh", value=False)
|
|
submitted = st.form_submit_button("Run Sitemap Crawl", type="primary")
|
|
|
|
if submitted:
|
|
if not start_url.strip():
|
|
st.error("Starting URL is required.")
|
|
else:
|
|
safe_name = sanitize_job_name(job_name)
|
|
output_path = SITEMAP_OUTPUT_DIR / f"{safe_name}.csv"
|
|
output_placeholder = st.empty()
|
|
captured_stdout = StreamlitOutputBuffer(output_placeholder)
|
|
captured_stdout.render()
|
|
|
|
try:
|
|
with st.spinner("Running sitemap crawl..."):
|
|
with contextlib.redirect_stdout(captured_stdout):
|
|
result = sitemap_builder.run_crawl(
|
|
start_url=start_url,
|
|
output_path=output_path,
|
|
max_pages=int(max_pages),
|
|
delay=float(delay),
|
|
timeout=float(timeout),
|
|
include_subdomains=include_subdomains,
|
|
include_documents=include_documents,
|
|
save_every=int(save_every),
|
|
workers=int(workers),
|
|
resume=resume_existing,
|
|
fresh=start_fresh,
|
|
)
|
|
captured_stdout.flush()
|
|
except Exception as exc:
|
|
captured_stdout.flush()
|
|
st.error(str(exc))
|
|
else:
|
|
st.session_state["sitemap_result"] = {
|
|
"summary": {
|
|
"records": len(result.state.records),
|
|
"visited": len(result.state.visited),
|
|
"queued": len(result.state.queue),
|
|
"errors": len(result.state.errors),
|
|
"skipped": result.state.skipped_count,
|
|
"from_sitemaps": result.state.discovered_from_sitemaps,
|
|
"user_stopped": result.user_stopped,
|
|
"max_pages": result.max_pages,
|
|
"workers": result.workers,
|
|
},
|
|
"output_path": str(result.output_path),
|
|
"state_path": str(result.state_path),
|
|
"log_path": str(result.log_path),
|
|
"stdout": captured_stdout.getvalue(),
|
|
}
|
|
|
|
result_data = st.session_state.get("sitemap_result")
|
|
if not result_data:
|
|
st.info("Run a crawl to generate a sitemap CSV.")
|
|
return
|
|
|
|
summary = result_data["summary"]
|
|
csv_path = Path(result_data["output_path"])
|
|
state_path = Path(result_data["state_path"])
|
|
log_path = Path(result_data["log_path"])
|
|
|
|
st.subheader("Crawl Summary")
|
|
metric_cols = st.columns(6)
|
|
metric_cols[0].metric("URLs Found", summary["records"])
|
|
metric_cols[1].metric("Visited", summary["visited"])
|
|
metric_cols[2].metric("Queued", summary["queued"])
|
|
metric_cols[3].metric("XML Seeded", summary["from_sitemaps"])
|
|
metric_cols[4].metric("Errors", summary["errors"])
|
|
metric_cols[5].metric("Skipped", summary["skipped"])
|
|
|
|
status_text = "Stopped by user." if summary["user_stopped"] else "Run completed."
|
|
st.caption(f"{status_text} Max pages used: {summary['max_pages']} | Worker threads: {summary['workers']}")
|
|
|
|
if csv_path.exists():
|
|
csv_bytes = csv_path.read_bytes()
|
|
st.download_button(
|
|
"Download Sitemap CSV",
|
|
data=csv_bytes,
|
|
file_name=csv_path.name,
|
|
mime="text/csv",
|
|
)
|
|
preview_rows = read_csv_preview(csv_bytes)
|
|
if preview_rows:
|
|
st.dataframe(preview_rows, width="stretch", hide_index=True)
|
|
|
|
file_cols = st.columns(2)
|
|
with file_cols[0]:
|
|
if state_path.exists():
|
|
st.download_button(
|
|
"Download Crawl State",
|
|
data=state_path.read_bytes(),
|
|
file_name=state_path.name,
|
|
mime="application/json",
|
|
)
|
|
with file_cols[1]:
|
|
if log_path.exists():
|
|
st.download_button(
|
|
"Download Crawl Log",
|
|
data=log_path.read_bytes(),
|
|
file_name=log_path.name,
|
|
mime="text/plain",
|
|
)
|
|
|
|
cleanup_targets = [path for path in (csv_path, state_path, log_path) if path.exists()]
|
|
if cleanup_targets:
|
|
st.caption("Cleanup removes the sitemap CSV, crawl state, and crawl log for this run.")
|
|
if st.button("Delete Crawl Files"):
|
|
removed_paths = sitemap_builder.cleanup_run_files(csv_path)
|
|
st.session_state.pop("sitemap_result", None)
|
|
if removed_paths:
|
|
removed_names = ", ".join(path.name for path in removed_paths)
|
|
st.success(f"Deleted: {removed_names}")
|
|
else:
|
|
st.info("No crawl files were present to delete.")
|
|
return
|
|
|
|
crawl_output = (result_data.get("stdout") or "").strip()
|
|
if crawl_output:
|
|
st.text_area("Scan Details", value=crawl_output, height=220, disabled=True)
|
|
|
|
if log_path.exists():
|
|
log_text = log_path.read_text(encoding="utf-8", errors="replace")
|
|
st.text_area("Log Tail", value="\n".join(log_text.splitlines()[-50:]), height=220, disabled=True)
|
|
|
|
|
|
def main() -> None:
|
|
st.set_page_config(page_title="WDW Tools", layout="wide")
|
|
st.header("WDW Sitemap And Import Tools")
|
|
selected_tool = st.radio(
|
|
"Tool",
|
|
["Sitemap Generator", "Page Importer"],
|
|
horizontal=True,
|
|
label_visibility="collapsed",
|
|
)
|
|
|
|
if selected_tool == "Sitemap Generator":
|
|
render_sitemap_tab()
|
|
else:
|
|
page_importer_app = get_page_importer_module()
|
|
page_importer_app.render_app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|