Edge Case fixes, bug fixes, and UI Cleanup.
Build Docker Image / docker (push) Successful in 6s

This commit is contained in:
2026-04-09 11:21:23 -07:00
parent 8667f547e6
commit 0e410a1f6c
5 changed files with 256 additions and 6 deletions
+24 -2
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
import csv
import datetime as dt
import hashlib
import io
import re
from dataclasses import replace
@@ -13,6 +14,7 @@ from page_importer.models import ScrapeOptions, ScrapedPost
from page_importer.scraper import Scraper
from page_importer.wxr import build_wxr
def load_csv(file_data: bytes) -> tuple[list[str], list[dict[str, str]]]:
text = file_data.decode("utf-8-sig", errors="replace")
reader = csv.DictReader(io.StringIO(text))
@@ -20,6 +22,20 @@ def load_csv(file_data: bytes) -> tuple[list[str], list[dict[str, str]]]:
return reader.fieldnames or [], rows
def build_upload_fingerprint(file_data: bytes) -> str:
return hashlib.sha256(file_data).hexdigest()
def sync_uploaded_file_state(session_state: dict[str, object], upload_fingerprint: str) -> None:
previous_fingerprint = session_state.get("uploaded_csv_fingerprint")
if previous_fingerprint == upload_fingerprint:
return
for key in ("results", "input_rows", "input_headers", "scrape_context"):
session_state.pop(key, None)
session_state["uploaded_csv_fingerprint"] = upload_fingerprint
def render_app() -> None:
st.title("Page Importer")
st.caption("Scrape blog posts from CSV URLs and export a WordPress WXR file.")
@@ -47,7 +63,10 @@ def render_app() -> None:
st.info("Upload a CSV to begin.")
return
headers, rows = load_csv(uploaded.getvalue())
uploaded_bytes = uploaded.getvalue()
sync_uploaded_file_state(st.session_state, build_upload_fingerprint(uploaded_bytes))
headers, rows = load_csv(uploaded_bytes)
if not rows:
st.error("The CSV did not contain any rows.")
return
@@ -88,6 +107,7 @@ def render_app() -> None:
results = scrape_rows(rows, context, phase_label="Scraping")
st.session_state["results"] = results
st.session_state["input_rows"] = rows
st.session_state["input_headers"] = headers
st.session_state["scrape_context"] = context
results = st.session_state.get("results", [])
@@ -176,7 +196,9 @@ def render_app() -> None:
st.write(f"**Author:** {selected.author or '(missing)'}")
st.write(f"**Post Type:** {selected.post_type}")
st.write(selected.body_html, unsafe_allow_html=True)
render_export_sidebar(successful, rows, headers)
stored_rows = st.session_state.get("input_rows", rows)
stored_headers = st.session_state.get("input_headers", headers)
render_export_sidebar(successful, stored_rows, stored_headers)
def build_scrape_context(
+108
View File
@@ -0,0 +1,108 @@
from __future__ import annotations
import importlib.util
import pathlib
import sys
import types
import unittest
ROOT_DIR = pathlib.Path(__file__).resolve().parents[2]
PAGE_IMPORTER_DIR = ROOT_DIR / "Page Importer"
if str(PAGE_IMPORTER_DIR) not in sys.path:
sys.path.insert(0, str(PAGE_IMPORTER_DIR))
APP_MODULE = None
def load_app_module():
original_modules = {
name: sys.modules.get(name)
for name in (
"streamlit",
"page_importer.dates",
"page_importer.models",
"page_importer.scraper",
"page_importer.wxr",
"page_importer_app_test",
)
}
try:
sys.modules["streamlit"] = types.ModuleType("streamlit")
dates_module = types.ModuleType("page_importer.dates")
dates_module.parse_datetime = lambda value: None
sys.modules["page_importer.dates"] = dates_module
models_module = types.ModuleType("page_importer.models")
class ScrapeOptions:
pass
class ScrapedPost:
pass
models_module.ScrapeOptions = ScrapeOptions
models_module.ScrapedPost = ScrapedPost
sys.modules["page_importer.models"] = models_module
scraper_module = types.ModuleType("page_importer.scraper")
scraper_module.Scraper = object
sys.modules["page_importer.scraper"] = scraper_module
wxr_module = types.ModuleType("page_importer.wxr")
wxr_module.build_wxr = lambda posts: ""
sys.modules["page_importer.wxr"] = wxr_module
app_path = PAGE_IMPORTER_DIR / "app.py"
spec = importlib.util.spec_from_file_location("page_importer_app_test", app_path)
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
sys.modules["page_importer_app_test"] = module
spec.loader.exec_module(module)
return module
finally:
for name, original in original_modules.items():
if original is None:
sys.modules.pop(name, None)
else:
sys.modules[name] = original
APP_MODULE = load_app_module()
class UploadStateTests(unittest.TestCase):
def test_sync_uploaded_file_state_clears_stale_results_for_new_file(self) -> None:
session_state = {
"uploaded_csv_fingerprint": "old",
"results": ["stale"],
"input_rows": [{"url": "https://example.com"}],
"input_headers": ["url"],
"scrape_context": {"url_column": "url"},
}
APP_MODULE.sync_uploaded_file_state(session_state, "new")
self.assertEqual(session_state["uploaded_csv_fingerprint"], "new")
self.assertNotIn("results", session_state)
self.assertNotIn("input_rows", session_state)
self.assertNotIn("input_headers", session_state)
self.assertNotIn("scrape_context", session_state)
def test_sync_uploaded_file_state_keeps_results_for_same_file(self) -> None:
session_state = {
"uploaded_csv_fingerprint": "same",
"results": ["keep"],
"input_rows": [{"url": "https://example.com"}],
}
APP_MODULE.sync_uploaded_file_state(session_state, "same")
self.assertEqual(session_state["results"], ["keep"])
self.assertEqual(session_state["input_rows"], [{"url": "https://example.com"}])
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,86 @@
from __future__ import annotations
import importlib.util
import pathlib
import sys
import tempfile
import unittest
ROOT_DIR = pathlib.Path(__file__).resolve().parents[2]
MODULE_PATH = ROOT_DIR / "Sitemap Builder" / "sitemap_builder.py"
SPEC = importlib.util.spec_from_file_location("sitemap_builder_test", MODULE_PATH)
assert SPEC is not None and SPEC.loader is not None
MODULE = importlib.util.module_from_spec(SPEC)
sys.modules["sitemap_builder_test"] = MODULE
SPEC.loader.exec_module(MODULE)
class SitemapBuilderTests(unittest.TestCase):
def test_cleanup_run_files_deletes_generated_artifacts(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
output_path = pathlib.Path(temp_dir) / "crawl.csv"
state_path = MODULE.get_state_path(output_path)
log_path = MODULE.get_log_path(output_path)
output_path.write_text("csv", encoding="utf-8")
state_path.write_text("state", encoding="utf-8")
log_path.write_text("log", encoding="utf-8")
removed_paths = MODULE.cleanup_run_files(output_path)
self.assertCountEqual(removed_paths, [output_path, state_path, log_path])
self.assertFalse(output_path.exists())
self.assertFalse(state_path.exists())
self.assertFalse(log_path.exists())
def test_run_crawl_rejects_resume_with_changed_subdomain_setting(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
output_path = pathlib.Path(temp_dir) / "crawl.csv"
state_path = MODULE.get_state_path(output_path)
state = MODULE.initialize_state("https://example.com", include_subdomains=False, include_documents=False)
MODULE.save_state(state, state_path, output_path)
with self.assertRaisesRegex(ValueError, "different subdomain setting"):
MODULE.run_crawl(
start_url="https://example.com",
output_path=output_path,
include_subdomains=True,
include_documents=False,
resume=True,
fresh=False,
)
def test_crawl_site_removes_dequeued_urls_from_queued_set(self) -> None:
original_fetch = MODULE.fetch_page_with_delay
try:
MODULE.fetch_page_with_delay = lambda url, timeout, user_agent, delay: MODULE.CrawlResult(url=url, links=[])
with tempfile.TemporaryDirectory() as temp_dir:
output_path = pathlib.Path(temp_dir) / "crawl.csv"
state_path = MODULE.get_state_path(output_path)
log_path = MODULE.get_log_path(output_path)
state = MODULE.initialize_state("https://example.com", include_subdomains=False, include_documents=False)
final_state, user_stopped = MODULE.crawl_site(
state=state,
max_pages=1,
delay=0.0,
timeout=1.0,
user_agent=MODULE.DEFAULT_USER_AGENT,
state_path=state_path,
output_path=output_path,
log_path=log_path,
save_every=1,
workers=1,
)
self.assertFalse(user_stopped)
self.assertEqual(list(final_state.queue), [])
self.assertEqual(final_state.queued, set())
finally:
MODULE.fetch_page_with_delay = original_fetch
if __name__ == "__main__":
unittest.main()