diff --git a/Page Importer/app.py b/Page Importer/app.py index f498294..36b5796 100644 --- a/Page Importer/app.py +++ b/Page Importer/app.py @@ -2,6 +2,7 @@ from __future__ import annotations import csv import datetime as dt +import hashlib import io import re from dataclasses import replace @@ -13,6 +14,7 @@ from page_importer.models import ScrapeOptions, ScrapedPost from page_importer.scraper import Scraper from page_importer.wxr import build_wxr + def load_csv(file_data: bytes) -> tuple[list[str], list[dict[str, str]]]: text = file_data.decode("utf-8-sig", errors="replace") reader = csv.DictReader(io.StringIO(text)) @@ -20,6 +22,20 @@ def load_csv(file_data: bytes) -> tuple[list[str], list[dict[str, str]]]: return reader.fieldnames or [], rows +def build_upload_fingerprint(file_data: bytes) -> str: + return hashlib.sha256(file_data).hexdigest() + + +def sync_uploaded_file_state(session_state: dict[str, object], upload_fingerprint: str) -> None: + previous_fingerprint = session_state.get("uploaded_csv_fingerprint") + if previous_fingerprint == upload_fingerprint: + return + + for key in ("results", "input_rows", "input_headers", "scrape_context"): + session_state.pop(key, None) + session_state["uploaded_csv_fingerprint"] = upload_fingerprint + + def render_app() -> None: st.title("Page Importer") st.caption("Scrape blog posts from CSV URLs and export a WordPress WXR file.") @@ -47,7 +63,10 @@ def render_app() -> None: st.info("Upload a CSV to begin.") return - headers, rows = load_csv(uploaded.getvalue()) + uploaded_bytes = uploaded.getvalue() + sync_uploaded_file_state(st.session_state, build_upload_fingerprint(uploaded_bytes)) + + headers, rows = load_csv(uploaded_bytes) if not rows: st.error("The CSV did not contain any rows.") return @@ -88,6 +107,7 @@ def render_app() -> None: results = scrape_rows(rows, context, phase_label="Scraping") st.session_state["results"] = results st.session_state["input_rows"] = rows + st.session_state["input_headers"] = headers st.session_state["scrape_context"] = context results = st.session_state.get("results", []) @@ -176,7 +196,9 @@ def render_app() -> None: st.write(f"**Author:** {selected.author or '(missing)'}") st.write(f"**Post Type:** {selected.post_type}") st.write(selected.body_html, unsafe_allow_html=True) - render_export_sidebar(successful, rows, headers) + stored_rows = st.session_state.get("input_rows", rows) + stored_headers = st.session_state.get("input_headers", headers) + render_export_sidebar(successful, stored_rows, stored_headers) def build_scrape_context( diff --git a/Page Importer/tests/test_app_state.py b/Page Importer/tests/test_app_state.py new file mode 100644 index 0000000..4db1a76 --- /dev/null +++ b/Page Importer/tests/test_app_state.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import importlib.util +import pathlib +import sys +import types +import unittest + + +ROOT_DIR = pathlib.Path(__file__).resolve().parents[2] +PAGE_IMPORTER_DIR = ROOT_DIR / "Page Importer" +if str(PAGE_IMPORTER_DIR) not in sys.path: + sys.path.insert(0, str(PAGE_IMPORTER_DIR)) + +APP_MODULE = None + + +def load_app_module(): + original_modules = { + name: sys.modules.get(name) + for name in ( + "streamlit", + "page_importer.dates", + "page_importer.models", + "page_importer.scraper", + "page_importer.wxr", + "page_importer_app_test", + ) + } + + try: + sys.modules["streamlit"] = types.ModuleType("streamlit") + + dates_module = types.ModuleType("page_importer.dates") + dates_module.parse_datetime = lambda value: None + sys.modules["page_importer.dates"] = dates_module + + models_module = types.ModuleType("page_importer.models") + + class ScrapeOptions: + pass + + class ScrapedPost: + pass + + models_module.ScrapeOptions = ScrapeOptions + models_module.ScrapedPost = ScrapedPost + sys.modules["page_importer.models"] = models_module + + scraper_module = types.ModuleType("page_importer.scraper") + scraper_module.Scraper = object + sys.modules["page_importer.scraper"] = scraper_module + + wxr_module = types.ModuleType("page_importer.wxr") + wxr_module.build_wxr = lambda posts: "" + sys.modules["page_importer.wxr"] = wxr_module + + app_path = PAGE_IMPORTER_DIR / "app.py" + spec = importlib.util.spec_from_file_location("page_importer_app_test", app_path) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules["page_importer_app_test"] = module + spec.loader.exec_module(module) + return module + finally: + for name, original in original_modules.items(): + if original is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = original + + +APP_MODULE = load_app_module() + + +class UploadStateTests(unittest.TestCase): + def test_sync_uploaded_file_state_clears_stale_results_for_new_file(self) -> None: + session_state = { + "uploaded_csv_fingerprint": "old", + "results": ["stale"], + "input_rows": [{"url": "https://example.com"}], + "input_headers": ["url"], + "scrape_context": {"url_column": "url"}, + } + + APP_MODULE.sync_uploaded_file_state(session_state, "new") + + self.assertEqual(session_state["uploaded_csv_fingerprint"], "new") + self.assertNotIn("results", session_state) + self.assertNotIn("input_rows", session_state) + self.assertNotIn("input_headers", session_state) + self.assertNotIn("scrape_context", session_state) + + def test_sync_uploaded_file_state_keeps_results_for_same_file(self) -> None: + session_state = { + "uploaded_csv_fingerprint": "same", + "results": ["keep"], + "input_rows": [{"url": "https://example.com"}], + } + + APP_MODULE.sync_uploaded_file_state(session_state, "same") + + self.assertEqual(session_state["results"], ["keep"]) + self.assertEqual(session_state["input_rows"], [{"url": "https://example.com"}]) + + +if __name__ == "__main__": + unittest.main() diff --git a/Page Importer/tests/test_sitemap_builder.py b/Page Importer/tests/test_sitemap_builder.py new file mode 100644 index 0000000..c0731f7 --- /dev/null +++ b/Page Importer/tests/test_sitemap_builder.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import importlib.util +import pathlib +import sys +import tempfile +import unittest + + +ROOT_DIR = pathlib.Path(__file__).resolve().parents[2] +MODULE_PATH = ROOT_DIR / "Sitemap Builder" / "sitemap_builder.py" +SPEC = importlib.util.spec_from_file_location("sitemap_builder_test", MODULE_PATH) +assert SPEC is not None and SPEC.loader is not None +MODULE = importlib.util.module_from_spec(SPEC) +sys.modules["sitemap_builder_test"] = MODULE +SPEC.loader.exec_module(MODULE) + + +class SitemapBuilderTests(unittest.TestCase): + def test_cleanup_run_files_deletes_generated_artifacts(self) -> None: + with tempfile.TemporaryDirectory() as temp_dir: + output_path = pathlib.Path(temp_dir) / "crawl.csv" + state_path = MODULE.get_state_path(output_path) + log_path = MODULE.get_log_path(output_path) + + output_path.write_text("csv", encoding="utf-8") + state_path.write_text("state", encoding="utf-8") + log_path.write_text("log", encoding="utf-8") + + removed_paths = MODULE.cleanup_run_files(output_path) + + self.assertCountEqual(removed_paths, [output_path, state_path, log_path]) + self.assertFalse(output_path.exists()) + self.assertFalse(state_path.exists()) + self.assertFalse(log_path.exists()) + + def test_run_crawl_rejects_resume_with_changed_subdomain_setting(self) -> None: + with tempfile.TemporaryDirectory() as temp_dir: + output_path = pathlib.Path(temp_dir) / "crawl.csv" + state_path = MODULE.get_state_path(output_path) + state = MODULE.initialize_state("https://example.com", include_subdomains=False, include_documents=False) + MODULE.save_state(state, state_path, output_path) + + with self.assertRaisesRegex(ValueError, "different subdomain setting"): + MODULE.run_crawl( + start_url="https://example.com", + output_path=output_path, + include_subdomains=True, + include_documents=False, + resume=True, + fresh=False, + ) + + def test_crawl_site_removes_dequeued_urls_from_queued_set(self) -> None: + original_fetch = MODULE.fetch_page_with_delay + try: + MODULE.fetch_page_with_delay = lambda url, timeout, user_agent, delay: MODULE.CrawlResult(url=url, links=[]) + + with tempfile.TemporaryDirectory() as temp_dir: + output_path = pathlib.Path(temp_dir) / "crawl.csv" + state_path = MODULE.get_state_path(output_path) + log_path = MODULE.get_log_path(output_path) + state = MODULE.initialize_state("https://example.com", include_subdomains=False, include_documents=False) + + final_state, user_stopped = MODULE.crawl_site( + state=state, + max_pages=1, + delay=0.0, + timeout=1.0, + user_agent=MODULE.DEFAULT_USER_AGENT, + state_path=state_path, + output_path=output_path, + log_path=log_path, + save_every=1, + workers=1, + ) + + self.assertFalse(user_stopped) + self.assertEqual(list(final_state.queue), []) + self.assertEqual(final_state.queued, set()) + finally: + MODULE.fetch_page_with_delay = original_fetch + + +if __name__ == "__main__": + unittest.main() diff --git a/Sitemap Builder/sitemap_builder.py b/Sitemap Builder/sitemap_builder.py index 454993b..8611897 100644 --- a/Sitemap Builder/sitemap_builder.py +++ b/Sitemap Builder/sitemap_builder.py @@ -175,6 +175,15 @@ def get_log_path(output_path: Path) -> Path: return output_path.with_suffix(output_path.suffix + DEFAULT_LOG_SUFFIX) +def cleanup_run_files(output_path: Path) -> list[Path]: + removed_paths: list[Path] = [] + for path in (Path(output_path), get_state_path(Path(output_path)), get_log_path(Path(output_path))): + if path.exists(): + path.unlink() + removed_paths.append(path) + return removed_paths + + def log_message(log_path: Path, message: str) -> None: log_path.parent.mkdir(parents=True, exist_ok=True) timestamp = time.strftime("%Y-%m-%d %H:%M:%S") @@ -599,6 +608,7 @@ def crawl_site( break current = resolve_alias(state.queue.popleft(), state.alias_to_canonical) + state.queued.discard(current) if current in state.visited: continue @@ -647,6 +657,7 @@ def crawl_site( while state.queue and len(pending) < workers and len(state.visited) < max_pages: current = resolve_alias(state.queue.popleft(), state.alias_to_canonical) + state.queued.discard(current) if current in state.visited: continue @@ -790,6 +801,11 @@ def run_crawl( "The saved crawl state uses a different document setting. " "Keep the same choice or start a fresh crawl." ) + if state.include_subdomains != include_subdomains: + raise ValueError( + "The saved crawl state uses a different subdomain setting. " + "Keep the same choice or start a fresh crawl." + ) else: state = initialize_state(normalized_start, include_subdomains, include_documents) diff --git a/app.py b/app.py index 9c310a2..4def6e5 100644 --- a/app.py +++ b/app.py @@ -137,6 +137,7 @@ def render_sitemap_tab() -> None: st.info("Run a crawl to generate a sitemap CSV.") return + sitemap_builder = get_sitemap_module() summary = result_data["summary"] csv_path = Path(result_data["output_path"]) state_path = Path(result_data["state_path"]) @@ -184,6 +185,19 @@ def render_sitemap_tab() -> None: mime="text/plain", ) + cleanup_targets = [path for path in (csv_path, state_path, log_path) if path.exists()] + if cleanup_targets: + st.caption("Cleanup removes the sitemap CSV, crawl state, and crawl log for this run.") + if st.button("Delete Crawl Files"): + removed_paths = sitemap_builder.cleanup_run_files(csv_path) + st.session_state.pop("sitemap_result", None) + if removed_paths: + removed_names = ", ".join(path.name for path in removed_paths) + st.success(f"Deleted: {removed_names}") + else: + st.info("No crawl files were present to delete.") + return + crawl_output = (result_data.get("stdout") or "").strip() if crawl_output: st.text_area("Crawler Output", value=crawl_output, height=220, disabled=True) @@ -196,12 +210,16 @@ def render_sitemap_tab() -> None: def main() -> None: st.set_page_config(page_title="WDW Tools", layout="wide") st.header("WDW Sitemap And Import Tools") - sitemap_tab, importer_tab = st.tabs(["Sitemap Generator", "Page Importer"]) + selected_tool = st.radio( + "Tool", + ["Sitemap Generator", "Page Importer"], + horizontal=True, + label_visibility="collapsed", + ) - with sitemap_tab: + if selected_tool == "Sitemap Generator": render_sitemap_tab() - - with importer_tab: + else: page_importer_app = get_page_importer_module() page_importer_app.render_app()