From ead872a0a54f699aba39e6447f5c1fd6eec2f7cb Mon Sep 17 00:00:00 2001 From: Jeffrey Long Date: Thu, 9 Apr 2026 10:42:10 -0700 Subject: [PATCH] first commit --- .dockerignore | 13 + .gitea/workflows/docker-image.yml | 42 ++ .gitignore | 15 + Dockerfile | 22 + Page Importer/.gitignore | 12 + Page Importer/README.md | 63 ++ Page Importer/app.py | 475 ++++++++++++ Page Importer/page_importer/__init__.py | 1 + Page Importer/page_importer/dates.py | 26 + Page Importer/page_importer/models.py | 34 + Page Importer/page_importer/scraper.py | 555 ++++++++++++++ Page Importer/page_importer/wxr.py | 91 +++ Page Importer/requirements.txt | 4 + Page Importer/tests/test_regressions.py | 79 ++ README.md | 110 +++ Sitemap Builder/README.md | 80 ++ Sitemap Builder/sitemap_builder.py | 947 ++++++++++++++++++++++++ app.py | 210 ++++++ requirements.txt | 4 + 19 files changed, 2783 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitea/workflows/docker-image.yml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Page Importer/.gitignore create mode 100644 Page Importer/README.md create mode 100644 Page Importer/app.py create mode 100644 Page Importer/page_importer/__init__.py create mode 100644 Page Importer/page_importer/dates.py create mode 100644 Page Importer/page_importer/models.py create mode 100644 Page Importer/page_importer/scraper.py create mode 100644 Page Importer/page_importer/wxr.py create mode 100644 Page Importer/requirements.txt create mode 100644 Page Importer/tests/test_regressions.py create mode 100644 README.md create mode 100644 Sitemap Builder/README.md create mode 100644 Sitemap Builder/sitemap_builder.py create mode 100644 app.py create mode 100644 requirements.txt diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..b55b03e --- /dev/null +++ b/.dockerignore @@ -0,0 +1,13 @@ +.git +.gitignore +.codex +**/.git +**/.venv +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.pytest_cache +**/.mypy_cache +**/.DS_Store +.data diff --git a/.gitea/workflows/docker-image.yml b/.gitea/workflows/docker-image.yml new file mode 100644 index 0000000..b916af3 --- /dev/null +++ b/.gitea/workflows/docker-image.yml @@ -0,0 +1,42 @@ +name: Build Docker Image + +on: + push: + branches: + - main + workflow_dispatch: + +env: + IMAGE_NAME: wdw-sitemap-and-importer + REGISTRY: ${{ secrets.REGISTRY_URL }} + REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }} + REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }} + +jobs: + docker: + runs-on: ubuntu-latest + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Build image + run: docker build -t "${IMAGE_NAME}:${GITHUB_SHA}" . + + - name: Tag latest image + run: docker tag "${IMAGE_NAME}:${GITHUB_SHA}" "${IMAGE_NAME}:latest" + + - name: Log in to registry + if: ${{ env.REGISTRY != '' && env.REGISTRY_USERNAME != '' && env.REGISTRY_PASSWORD != '' }} + run: echo "${REGISTRY_PASSWORD}" | docker login "${REGISTRY}" -u "${REGISTRY_USERNAME}" --password-stdin + + - name: Push commit image + if: ${{ env.REGISTRY != '' && env.REGISTRY_USERNAME != '' && env.REGISTRY_PASSWORD != '' }} + run: | + docker tag "${IMAGE_NAME}:${GITHUB_SHA}" "${REGISTRY}/${IMAGE_NAME}:${GITHUB_SHA}" + docker push "${REGISTRY}/${IMAGE_NAME}:${GITHUB_SHA}" + + - name: Push latest image + if: ${{ env.REGISTRY != '' && env.REGISTRY_USERNAME != '' && env.REGISTRY_PASSWORD != '' }} + run: | + docker tag "${IMAGE_NAME}:latest" "${REGISTRY}/${IMAGE_NAME}:latest" + docker push "${REGISTRY}/${IMAGE_NAME}:latest" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..449cee2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +.codex +.data/ +__pycache__/ +*.py[cod] + +.venv/ +**/.venv/ +**/__pycache__/ +.pytest_cache/ +.mypy_cache/ + +*.crawl.log +*.crawlstate.json + +streamlit_uploads/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9bc88e9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.14-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 \ + STREAMLIT_SERVER_HEADLESS=true \ + STREAMLIT_SERVER_PORT=8501 \ + STREAMLIT_SERVER_ADDRESS=0.0.0.0 \ + APP_DATA_DIR=/data + +WORKDIR /app + +COPY requirements.txt ./requirements.txt +RUN pip install -r requirements.txt + +COPY . . + +RUN mkdir -p /data + +EXPOSE 8501 + +CMD ["streamlit", "run", "app.py"] diff --git a/Page Importer/.gitignore b/Page Importer/.gitignore new file mode 100644 index 0000000..e357643 --- /dev/null +++ b/Page Importer/.gitignore @@ -0,0 +1,12 @@ +.venv/ +__pycache__/ +*.py[cod] +*$py.class + +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ + +.streamlit/secrets.toml + +*.log diff --git a/Page Importer/README.md b/Page Importer/README.md new file mode 100644 index 0000000..d4f830d --- /dev/null +++ b/Page Importer/README.md @@ -0,0 +1,63 @@ +# Page Importer + +This folder contains the WordPress import tool used by the combined application in the repository root. + +The importer still uses Streamlit internally, but it is now rendered as the `Page Importer` tab inside the shared app rather than being the main entrypoint for the repository. + +## Features + +- Upload a CSV of submitted URLs +- Choose the URL column and optional title override column +- Optionally map post type from the CSV or force a single post type +- Scrape only the listed URLs +- Extract title, publish date, author, body HTML, categories, and tags +- Retry failed rows +- Export a WordPress WXR XML file + +## Recommended Usage + +Run the root application: + +```bash +streamlit run ../app.py +``` + +Or run the combined Docker container from the repository root. + +## Standalone Usage + +If you need to run this importer by itself: + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +streamlit run app.py +``` + +On Windows PowerShell: + +```powershell +python -m venv .venv +.venv\Scripts\Activate.ps1 +pip install -r requirements.txt +streamlit run app.py +``` + +## CSV Input + +The app accepts CSV files with any columns. You choose: + +- the URL column to scrape +- an optional title or name column to override the scraped title +- an optional post type column with values like `post` or `page` +- an optional category column whose values are appended during export + +You can also add manual categories in the sidebar to append them to every exported item. + +## Notes + +- Exported posts default to `draft` unless changed in the UI +- Image and link URLs remain pointed at the source site +- Some themes need heuristic fallback. The `Force heuristic scraping` option skips JSON-LD-first extraction and relies on page structure +- In the combined app, dependencies come from the root `requirements.txt` diff --git a/Page Importer/app.py b/Page Importer/app.py new file mode 100644 index 0000000..f498294 --- /dev/null +++ b/Page Importer/app.py @@ -0,0 +1,475 @@ +from __future__ import annotations + +import csv +import datetime as dt +import io +import re +from dataclasses import replace + +import streamlit as st + +from page_importer.dates import parse_datetime +from page_importer.models import ScrapeOptions, ScrapedPost +from page_importer.scraper import Scraper +from page_importer.wxr import build_wxr + +def load_csv(file_data: bytes) -> tuple[list[str], list[dict[str, str]]]: + text = file_data.decode("utf-8-sig", errors="replace") + reader = csv.DictReader(io.StringIO(text)) + rows = list(reader) + return reader.fieldnames or [], rows + + +def render_app() -> None: + st.title("Page Importer") + st.caption("Scrape blog posts from CSV URLs and export a WordPress WXR file.") + + with st.sidebar: + st.header("Options") + include_author = st.checkbox("Include author", value=True) + include_categories = st.checkbox("Include categories", value=True) + include_tags = st.checkbox("Include tags", value=True) + force_heuristics = st.checkbox("Force heuristic scraping", value=False) + test_run = st.checkbox( + "Test run only", + value=False, + help="Scrape only the first 10 rows that contain a URL.", + ) + post_type_mode = st.selectbox( + "WordPress post type mode", + ["Single type for all rows", "Use a CSV column"], + index=0, + ) + default_post_type = st.selectbox("Default WordPress post type", ["post", "page"], index=0) + + uploaded = st.file_uploader("Upload CSV", type=["csv"]) + if not uploaded: + st.info("Upload a CSV to begin.") + return + + headers, rows = load_csv(uploaded.getvalue()) + if not rows: + st.error("The CSV did not contain any rows.") + return + + col1, col2, col3 = st.columns(3) + with col1: + url_column = st.selectbox("URL column", headers, index=_safe_index(headers, ["url", "link"])) + with col2: + title_column = st.selectbox( + "Optional title override column", + ["(none)", *headers], + index=_safe_index(["(none)", *headers], ["name", "title"]), + ) + with col3: + post_type_column = st.selectbox( + "Optional post type column", + ["(none)", *headers], + index=_safe_index(["(none)", *headers], ["post_type", "type"]), + disabled=post_type_mode != "Use a CSV column", + ) + st.write(f"Loaded {len(rows)} row(s). Only the selected URL column will be scraped.") + if test_run: + st.caption("Test run is enabled. Only the first 10 rows with a URL will be scraped.") + + if st.button("Scrape URLs", type="primary"): + context = build_scrape_context( + include_author=include_author, + include_categories=include_categories, + include_tags=include_tags, + force_heuristics=force_heuristics, + test_run=test_run, + post_type_mode=post_type_mode, + post_type_column=post_type_column, + default_post_type=default_post_type, + url_column=url_column, + title_column=title_column, + ) + results = scrape_rows(rows, context, phase_label="Scraping") + st.session_state["results"] = results + st.session_state["input_rows"] = rows + st.session_state["scrape_context"] = context + + results = st.session_state.get("results", []) + if not results: + return + + successful = [post for post in results if post.success] + failed = [post for post in results if not post.success] + + st.subheader("Results") + st.write(f"Successful: {len(successful)} | Failed: {len(failed)}") + + if failed and st.button("Retry failed items"): + stored_rows = st.session_state.get("input_rows", rows) + context = st.session_state.get("scrape_context") + if context: + retried = scrape_rows( + stored_rows, + context, + row_numbers=[post.row_number for post in failed if post.row_number], + phase_label="Retrying", + ) + results = merge_retry_results(results, retried) + st.session_state["results"] = results + successful = [post for post in results if post.success] + failed = [post for post in results if not post.success] + + preview_rows = [] + for post in results: + preview_rows.append( + { + "Row": post.row_number, + "URL": post.source_url, + "CMS": post.cms, + "Success": post.success, + "Title": post.title, + "Publish Date": post.publish_date, + "Author": post.author, + "Categories": ", ".join(post.categories), + "Tags": ", ".join(post.tags), + "Post Type": post.post_type, + "Error": post.error, + } + ) + st.dataframe( + preview_rows, + width="stretch", + hide_index=True, + column_config={ + "Row": st.column_config.NumberColumn(width="small"), + "URL": st.column_config.TextColumn(width="medium"), + "Title": st.column_config.TextColumn(width="medium"), + "Publish Date": st.column_config.TextColumn(width="medium"), + "Categories": st.column_config.TextColumn(width="medium"), + "Tags": st.column_config.TextColumn(width="medium"), + "Error": st.column_config.TextColumn(width="large"), + }, + ) + + if failed: + selected_failed = st.selectbox( + "Failed row details", + failed, + format_func=lambda post: f"Row {post.row_number}: {post.source_url or '(missing URL)'}", + ) + st.text_area( + "Error details", + value=selected_failed.error_details or selected_failed.error, + height=180, + disabled=True, + ) + + if successful: + selected_index = st.number_input( + "Preview successful row", + min_value=1, + max_value=len(successful), + value=1, + step=1, + ) + selected = successful[selected_index - 1] + st.markdown("### Content Preview") + st.write(f"**Title:** {selected.title}") + st.write(f"**Source URL:** {selected.source_url}") + st.write(f"**Publish Date:** {selected.publish_date or '(missing)'}") + st.write(f"**Author:** {selected.author or '(missing)'}") + st.write(f"**Post Type:** {selected.post_type}") + st.write(selected.body_html, unsafe_allow_html=True) + render_export_sidebar(successful, rows, headers) + + +def build_scrape_context( + *, + include_author: bool, + include_categories: bool, + include_tags: bool, + force_heuristics: bool, + test_run: bool, + post_type_mode: str, + post_type_column: str, + default_post_type: str, + url_column: str, + title_column: str, +) -> dict[str, object]: + return { + "options": ScrapeOptions( + include_author=include_author, + include_categories=include_categories, + include_tags=include_tags, + force_heuristics=force_heuristics, + ), + "test_run": test_run, + "post_type_mode": post_type_mode, + "post_type_column": post_type_column, + "default_post_type": default_post_type, + "url_column": url_column, + "title_column": title_column, + } + + +def scrape_rows( + rows: list[dict[str, str]], + context: dict[str, object], + row_numbers: list[int] | None = None, + phase_label: str = "Scraping", +) -> list[ScrapedPost]: + options = context["options"] + if not isinstance(options, ScrapeOptions): + raise TypeError("Invalid scrape options in session state.") + + scraper = Scraper(options) + targets = list(enumerate(rows, start=1)) + if row_numbers is not None: + requested_rows = set(row_numbers) + targets = [(row_number, row) for row_number, row in targets if row_number in requested_rows] + elif bool(context.get("test_run")): + targets = [ + (row_number, row) + for row_number, row in targets + if (row.get(str(context["url_column"])) or "").strip() + ][:10] + + results: list[ScrapedPost] = [] + progress = st.progress(0.0) + status = st.empty() + + total = len(targets) or 1 + for index, (row_number, row) in enumerate(targets, start=1): + url = (row.get(context["url_column"]) or "").strip() + status.write(f"{phase_label} {index}/{len(targets)}: {url or f'row {row_number} has no URL'}") + + if url: + post = scraper.scrape(url) + else: + post = ScrapedPost( + source_url="", + row_number=row_number, + error="Missing URL in the selected URL column.", + error_details=f"Row {row_number} does not contain a URL in column '{context['url_column']}'.", + ) + + post.row_number = row_number + apply_row_overrides(post, row, context) + results.append(post) + progress.progress(index / total) + + status.write(f"{phase_label} complete.") + return results + + +def apply_row_overrides(post: ScrapedPost, row: dict[str, str], context: dict[str, object]) -> None: + title_column = context["title_column"] + if isinstance(title_column, str) and title_column != "(none)" and row.get(title_column): + post.title = row[title_column].strip() + + post.post_type = resolve_post_type( + row=row, + mode=str(context["post_type_mode"]), + column=str(context["post_type_column"]), + default_value=str(context["default_post_type"]), + ) + + +def resolve_export_categories( + row: dict[str, str], + category_column: str, + manual_categories: list[str], +) -> list[str]: + csv_categories = parse_terms(row.get(category_column, "")) if category_column != "(none)" else [] + return merge_unique_terms(csv_categories, manual_categories) + + +def parse_terms(value: str) -> list[str]: + return [term.strip() for term in re.split(r"[,|>]", value or "") if term.strip()] + + +def merge_unique_terms(*groups: list[str]) -> list[str]: + merged: list[str] = [] + for group in groups: + for term in group: + cleaned = term.strip() + if cleaned and cleaned not in merged: + merged.append(cleaned) + return merged + + +def merge_retry_results(existing: list[ScrapedPost], replacements: list[ScrapedPost]) -> list[ScrapedPost]: + replacement_map = {post.row_number: post for post in replacements} + merged = [replacement_map.get(post.row_number, post) for post in existing] + return sorted(merged, key=lambda post: post.row_number or 0) + + +def build_export_posts( + posts: list[ScrapedPost], + rows: list[dict[str, str]], + category_column: str, + manual_categories: list[str], + post_status: str, + custom_post_type_slug: str, +) -> list[ScrapedPost]: + export_posts: list[ScrapedPost] = [] + for post in posts: + row = rows[post.row_number - 1] if 0 < post.row_number <= len(rows) else {} + export_posts.append( + replace( + post, + status=post_status, + post_type=custom_post_type_slug or post.post_type, + categories=merge_unique_terms( + post.categories, + resolve_export_categories(row, category_column, manual_categories), + ), + ) + ) + return export_posts + + +def render_export_sidebar( + successful: list[ScrapedPost], + rows: list[dict[str, str]], + headers: list[str], +) -> None: + with st.sidebar: + st.markdown("---") + st.subheader("Export") + post_status = st.selectbox( + "Imported post status", + ["draft", "publish", "private"], + index=0, + key="export_post_status", + ) + category_column = st.selectbox( + "CSV category column", + ["(none)", *headers], + index=_safe_index(["(none)", *headers], ["category", "categories", "department"]), + key="export_category_column", + ) + manual_categories = parse_terms( + st.text_input( + "Additional export categories", + value="", + help="Comma-separated categories to append to every exported item.", + key="export_manual_categories", + ) + ) + output_name = st.text_input( + "Output filename", + value="wordpress-import.xml", + key="export_output_name", + ) + custom_post_type_slug = normalize_post_type_slug( + st.text_input( + "Custom post type slug", + value="", + help="Optional. If set, all exported items will use this WordPress post type slug.", + key="export_custom_post_type_slug", + ) + ) + + export_posts = build_export_posts( + successful, + rows, + category_column, + manual_categories, + post_status, + custom_post_type_slug, + ) + if custom_post_type_slug: + st.caption(f"Exporting all items as post type `{custom_post_type_slug}`.") + dated_export_posts = [(post, publish_date) for post in export_posts if (publish_date := parse_publish_date(post.publish_date))] + + if dated_export_posts: + min_date = min(publish_date for _, publish_date in dated_export_posts) + max_date = max(publish_date for _, publish_date in dated_export_posts) + filter_by_publish_date = st.checkbox( + "Filter export by publish date", + value=False, + key="export_filter_by_publish_date", + ) + + if filter_by_publish_date: + export_start = st.date_input( + "Export start date", + value=min_date, + min_value=min_date, + max_value=max_date, + format="MM/DD/YYYY", + key="export_start_date", + ) + export_end = st.date_input( + "Export end date", + value=max_date, + min_value=min_date, + max_value=max_date, + format="MM/DD/YYYY", + key="export_end_date", + ) + + if export_start > export_end: + st.error("Export start date must be on or before the end date.") + export_posts = [] + else: + export_posts = [ + post + for post in export_posts + if (publish_date := parse_publish_date(post.publish_date)) and export_start <= publish_date <= export_end + ] + st.caption( + "Date filter: " + f"{export_start.strftime('%m/%d/%Y')} to {export_end.strftime('%m/%d/%Y')}." + ) + undated_count = len(successful) - len(dated_export_posts) + if undated_count: + st.caption(f"Excluded {undated_count} successful item(s) with no publish date.") + else: + st.caption("No successful items have a publish date, so export date filtering is unavailable.") + + st.caption(f"Ready to export {len(export_posts)} post(s).") + xml_data = build_wxr(export_posts) + st.download_button( + label="Download WXR XML", + data=xml_data, + file_name=output_name, + mime="application/xml", + disabled=not export_posts, + ) + + +def parse_publish_date(value: str) -> dt.date | None: + parsed = parse_datetime(value) + if parsed is None: + return None + return parsed.date() + + +def _safe_index(values: list[str], candidates: list[str]) -> int: + lowered = {value.lower(): idx for idx, value in enumerate(values)} + for candidate in candidates: + if candidate in lowered: + return lowered[candidate] + return 0 + + +def resolve_post_type( + row: dict[str, str], + mode: str, + column: str, + default_value: str, +) -> str: + if mode != "Use a CSV column" or column == "(none)": + return default_value + + raw_value = normalize_post_type_slug(row.get(column) or "") + if raw_value: + return raw_value + return default_value + + +def normalize_post_type_slug(value: str) -> str: + return re.sub(r"[^a-z0-9_-]", "", (value or "").strip().lower()) + + +if __name__ == "__main__": + st.set_page_config(page_title="Page Importer", layout="wide") + render_app() diff --git a/Page Importer/page_importer/__init__.py b/Page Importer/page_importer/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/Page Importer/page_importer/__init__.py @@ -0,0 +1 @@ + diff --git a/Page Importer/page_importer/dates.py b/Page Importer/page_importer/dates.py new file mode 100644 index 0000000..82a00c1 --- /dev/null +++ b/Page Importer/page_importer/dates.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import datetime as dt + +from dateutil import parser as date_parser + + +def parse_datetime(value: str | None) -> dt.datetime | None: + if not value: + return None + try: + return date_parser.parse(value) + except (TypeError, ValueError, OverflowError): + try: + return date_parser.parse(value, fuzzy=True) + except (TypeError, ValueError, OverflowError): + return None + + +def normalize_date(value: str | None) -> str: + parsed = parse_datetime(value) + if parsed is None: + return "" + if parsed.tzinfo is None or parsed.utcoffset() is None: + return parsed.strftime("%Y-%m-%d %H:%M:%S") + return parsed.isoformat(sep=" ", timespec="seconds") diff --git a/Page Importer/page_importer/models.py b/Page Importer/page_importer/models.py new file mode 100644 index 0000000..7e7c219 --- /dev/null +++ b/Page Importer/page_importer/models.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class ScrapeOptions: + include_author: bool = True + include_categories: bool = True + include_tags: bool = True + force_heuristics: bool = False + request_timeout: int = 20 + user_agent: str = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0 Safari/537.36" + ) + + +@dataclass +class ScrapedPost: + source_url: str + row_number: int = 0 + cms: str = "unknown" + title: str = "" + publish_date: str = "" + author: str = "" + body_html: str = "" + categories: list[str] = field(default_factory=list) + tags: list[str] = field(default_factory=list) + status: str = "draft" + post_type: str = "post" + success: bool = False + error: str = "" + error_details: str = "" diff --git a/Page Importer/page_importer/scraper.py b/Page Importer/page_importer/scraper.py new file mode 100644 index 0000000..a800220 --- /dev/null +++ b/Page Importer/page_importer/scraper.py @@ -0,0 +1,555 @@ +from __future__ import annotations + +import json +import re +import traceback +from html import unescape +from typing import Iterable + +import requests +from bs4 import BeautifulSoup +from bs4.element import NavigableString, Tag + +from page_importer.dates import normalize_date +from page_importer.models import ScrapeOptions, ScrapedPost + +JSON_ARTICLE_TYPES = { + "article", + "blogposting", + "newsarticle", + "report", + "webpage", +} + +BODY_SELECTORS = [ + "article .entry-content", + "article .post-content", + "article .node__content", + "article .node .content", + "article .node-content", + "article .field-name-body .field-item", + "article .field-name-body", + "article .field--name-body", + "article .article-body", + "article .content", + ".post-content", + ".entry-content", + ".node__content", + ".node .content", + ".node-content", + ".field-name-body .field-item", + ".field-name-body", + ".field--name-body", + ".article-body", + "#content-area .node .content", + "article", + "main article", + "main", +] + +CATEGORY_SELECTORS = [ + ".cat-links a", + ".post-categories a", + ".field--name-field-category a", + ".tags a[rel='category tag']", + ".terms a", + ".taxonomy a", +] + +TAG_SELECTORS = [ + ".tags-links a", + ".post-tags a", + ".field--name-field-tags a", + "a[rel='tag']", + ".terms a", +] + +AUTHOR_SELECTORS = [ + "[rel='author']", + ".author a", + ".byline a", + ".submitted a", + ".node__submitted a", + ".node-info a", + ".createdby", +] + +DATE_SELECTORS = [ + "time[datetime]", + "meta[property='article:published_time']", + "meta[name='publish_date']", + "meta[name='pubdate']", + ".date-display-single", + ".submitted", + ".node-info", +] + +DRUPAL_TITLE_DATE_PATTERN = re.compile( + r"(Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),\s+" + r"([A-Za-z]+)\s+\d{1,2},\s+\d{4}" +) + + +class Scraper: + def __init__(self, options: ScrapeOptions) -> None: + self.options = options + self.session = requests.Session() + self.session.headers.update({"User-Agent": options.user_agent}) + + def scrape(self, url: str) -> ScrapedPost: + post = ScrapedPost(source_url=url) + response: requests.Response | None = None + try: + response = self.session.get(url, timeout=self.options.request_timeout) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + post.cms = detect_cms(soup) + + article_data = extract_article_json_ld(soup) + if article_data and not self.options.force_heuristics: + apply_article_data(post, article_data, soup, self.options) + + merge_fallback_data(post, soup, self.options) + post.body_html = sanitize_html(post.body_html) + + missing_fields = [field for field, value in {"title": post.title, "body_html": post.body_html}.items() if not value] + if missing_fields: + raise ValueError( + "Unable to extract required field(s): " + f"{', '.join(missing_fields)}. " + f"Detected CMS: {post.cms}. " + f"Publish date found: {'yes' if post.publish_date else 'no'}. " + f"Author found: {'yes' if post.author else 'no'}." + ) + + post.success = True + return post + except Exception as exc: + post.error = format_error_summary(url, exc, response, self.options.request_timeout) + post.error_details = format_error_details(url, exc, response) + return post + + +def detect_cms(soup: BeautifulSoup) -> str: + generator = meta_content(soup, "meta", {"name": "generator"}) + html = str(soup).lower() + if generator: + g = generator.lower() + if "wordpress" in g: + return "wordpress" + if "drupal" in g: + return "drupal" + if "joomla" in g: + return "joomla" + if "/wp-content/" in html: + return "wordpress" + if "drupal-settings-json" in html or "sites/default/files" in html: + return "drupal" + if "com_content" in html or "joomla" in html: + return "joomla" + return "unknown" + + +def extract_article_json_ld(soup: BeautifulSoup) -> dict | None: + for script in soup.select("script[type='application/ld+json']"): + raw = script.string or script.get_text(" ", strip=True) + if not raw: + continue + for payload in parse_json_candidates(raw): + article = find_article_payload(payload) + if article: + return article + return None + + +def parse_json_candidates(raw: str) -> Iterable[dict | list]: + try: + data = json.loads(raw) + yield data + return + except json.JSONDecodeError: + pass + + cleaned = re.sub(r"[\x00-\x1f]+", " ", raw).strip() + try: + data = json.loads(cleaned) + yield data + except json.JSONDecodeError: + return + + +def find_article_payload(payload: dict | list) -> dict | None: + if isinstance(payload, list): + for item in payload: + found = find_article_payload(item) + if found: + return found + return None + if not isinstance(payload, dict): + return None + if "@graph" in payload: + found = find_article_payload(payload["@graph"]) + if found: + return found + node_type = payload.get("@type") + types = {node_type.lower()} if isinstance(node_type, str) else { + item.lower() for item in node_type or [] if isinstance(item, str) + } + if types & JSON_ARTICLE_TYPES: + return payload + return None + + +def apply_article_data( + post: ScrapedPost, + article: dict, + soup: BeautifulSoup, + options: ScrapeOptions, +) -> None: + post.title = article.get("headline") or article.get("name") or post.title + post.publish_date = normalize_date( + article.get("datePublished") or article.get("dateCreated") or post.publish_date + ) + if options.include_author: + post.author = extract_author_from_json_ld(article) or post.author + if options.include_categories: + post.categories = normalize_terms(article.get("articleSection")) or post.categories + if options.include_tags: + post.tags = normalize_terms(article.get("keywords")) or post.tags + post.body_html = extract_body_from_article(article, soup) or post.body_html + + +def merge_fallback_data(post: ScrapedPost, soup: BeautifulSoup, options: ScrapeOptions) -> None: + if not post.title: + post.title = extract_title(soup) + if not post.publish_date: + post.publish_date = extract_date(soup, post.cms) + if options.include_author and not post.author: + post.author = extract_author(soup) + if not post.body_html: + post.body_html = extract_body(soup) + if options.include_categories: + post.categories = merge_terms(post.categories, extract_terms(soup, CATEGORY_SELECTORS)) + if post.cms == "drupal": + post.categories = merge_terms(post.categories, extract_drupal_department_categories(soup)) + if options.include_tags and not post.tags: + post.tags = extract_terms(soup, TAG_SELECTORS) + + +def extract_title(soup: BeautifulSoup) -> str: + og_title = meta_content(soup, "meta", {"property": "og:title"}) + if og_title: + return og_title + for selector in ("article h1", "h1.entry-title", "h1.page-title", "h1.title", "h1"): + node = soup.select_one(selector) + if node: + return clean_text(node.get_text(" ", strip=True)) + return clean_text(soup.title.get_text(" ", strip=True)) if soup.title else "" + + +def extract_date(soup: BeautifulSoup, cms: str = "unknown") -> str: + for selector in DATE_SELECTORS: + node = soup.select_one(selector) + if not node: + continue + candidate = node.get("datetime") or node.get("content") or node.get_text(" ", strip=True) + normalized = normalize_date(candidate) + if normalized: + return normalized + if cms == "drupal": + return extract_drupal_title_adjacent_date(soup) + return "" + + +def extract_author(soup: BeautifulSoup) -> str: + author = meta_content(soup, "meta", {"name": "author"}) + if author: + return clean_text(author) + for selector in AUTHOR_SELECTORS: + node = soup.select_one(selector) + if node: + return clean_text(node.get_text(" ", strip=True)) + return "" + + +def extract_body(soup: BeautifulSoup) -> str: + fallback_html = "" + for selector in BODY_SELECTORS: + node = soup.select_one(selector) + if not node: + continue + candidate = clone_tag(node) + strip_unwanted(candidate) + html = candidate.decode_contents().strip() + text_length = len(BeautifulSoup(html, "html.parser").get_text(" ", strip=True)) + if text_length >= 120: + return html + if not fallback_html and has_meaningful_body_content(html): + fallback_html = html + return fallback_html + + +def extract_terms(soup: BeautifulSoup, selectors: list[str]) -> list[str]: + terms: list[str] = [] + for selector in selectors: + for node in soup.select(selector): + term = clean_text(node.get_text(" ", strip=True)) + if term and term not in terms: + terms.append(term) + return terms + + +def extract_drupal_title_adjacent_date(soup: BeautifulSoup) -> str: + title_node = find_title_node(soup) + if not title_node: + return "" + + for sibling in title_node.next_siblings: + candidate = text_from_node(sibling) + normalized = normalize_drupal_date(candidate) + if normalized: + return normalized + + header = title_node.find_parent(["header", "div", "section"]) + if header: + header_text = clean_text(header.get_text(" ", strip=True)) + title_text = clean_text(title_node.get_text(" ", strip=True)) + if title_text and header_text.startswith(title_text): + header_text = clean_text(header_text[len(title_text):]) + normalized = normalize_drupal_date(header_text) + if normalized: + return normalized + + return "" + + +def extract_drupal_department_categories(soup: BeautifulSoup) -> list[str]: + categories: list[str] = [] + label_pattern = re.compile(r"^\s*Department:\s*$", re.IGNORECASE) + + for label_node in soup.find_all(string=label_pattern): + parent = label_node.parent if isinstance(label_node.parent, Tag) else None + if not parent: + continue + + inline_value = extract_labeled_value(parent.get_text(" ", strip=True), "Department") + normalized_inline_value = normalize_department_category(inline_value) + if normalized_inline_value: + categories = merge_terms(categories, [normalized_inline_value]) + continue + + for sibling in parent.next_siblings: + value = normalize_department_category(text_from_node(sibling)) + if value: + categories = merge_terms(categories, [value]) + break + + for candidate in soup.find_all(["p", "li", "span", "dt", "dd"]): + text = clean_text(candidate.get_text(" ", strip=True)) + if not text.lower().startswith("department:"): + continue + extracted = normalize_department_category(extract_labeled_value(text, "Department")) + if extracted: + categories = merge_terms(categories, [extracted]) + + return categories + + +def extract_author_from_json_ld(article: dict) -> str: + author = article.get("author") + if isinstance(author, dict): + return clean_text(author.get("name", "")) + if isinstance(author, list): + names = [clean_text(item.get("name", "")) for item in author if isinstance(item, dict)] + return ", ".join(name for name in names if name) + if isinstance(author, str): + return clean_text(author) + return "" + + +def extract_body_from_article(article: dict, soup: BeautifulSoup) -> str: + body = article.get("articleBody") + if isinstance(body, str) and len(body.strip()) > 120: + return f"

{unescape(body.strip())}

" + return extract_body(soup) + + +def normalize_terms(value: object) -> list[str]: + if isinstance(value, str): + parts = re.split(r"[,|>]", value) + return [clean_text(part) for part in parts if clean_text(part)] + if isinstance(value, list): + result: list[str] = [] + for item in value: + if isinstance(item, str): + cleaned = clean_text(item) + if cleaned and cleaned not in result: + result.append(cleaned) + return result + return [] + + +def merge_terms(*groups: list[str]) -> list[str]: + merged: list[str] = [] + for group in groups: + for item in group: + cleaned = clean_text(item) + if cleaned and cleaned not in merged: + merged.append(cleaned) + return merged + + +def normalize_drupal_date(value: str | None) -> str: + if not value: + return "" + match = DRUPAL_TITLE_DATE_PATTERN.search(value) + if not match: + return "" + return normalize_date(match.group(0)) + + +def meta_content(soup: BeautifulSoup, tag_name: str, attrs: dict[str, str]) -> str: + node = soup.find(tag_name, attrs=attrs) + if node and node.get("content"): + return node["content"].strip() + return "" + + +def clean_text(value: str) -> str: + return re.sub(r"\s+", " ", value or "").strip() + + +def text_from_node(node: object) -> str: + if isinstance(node, NavigableString): + return clean_text(str(node)) + if isinstance(node, Tag): + return clean_text(node.get_text(" ", strip=True)) + return "" + + +def sanitize_html(html: str) -> str: + if not html: + return "" + soup = BeautifulSoup(html, "html.parser") + strip_unwanted(soup) + strip_dangerous_attributes(soup) + return soup.decode_contents().strip() + + +def has_meaningful_body_content(html: str) -> bool: + if not html: + return False + text = BeautifulSoup(html, "html.parser").get_text(" ", strip=True) + return bool(text) or any(token in html.lower() for token in (" None: + for selector in ("script", "style", "noscript", "iframe", "form", "nav", ".share", ".social-share"): + for child in node.select(selector): + child.decompose() + + +def strip_dangerous_attributes(node: BeautifulSoup | Tag) -> None: + for child in node.find_all(True): + for attr_name in list(child.attrs): + normalized_name = attr_name.lower() + if normalized_name.startswith("on") or normalized_name == "srcdoc": + del child.attrs[attr_name] + continue + + if normalized_name not in {"href", "src", "action", "formaction", "xlink:href"}: + continue + + raw_value = child.attrs.get(attr_name) + if isinstance(raw_value, list): + candidate = " ".join(str(item) for item in raw_value) + else: + candidate = str(raw_value or "") + + lowered = candidate.strip().lower() + if lowered.startswith(("javascript:", "vbscript:", "data:text/html")): + del child.attrs[attr_name] + + +def clone_tag(node: Tag) -> BeautifulSoup: + return BeautifulSoup(str(node), "html.parser") + + +def find_title_node(soup: BeautifulSoup) -> Tag | None: + for selector in ("article h1", "h1.entry-title", "h1.page-title", "h1.title", "h1"): + node = soup.select_one(selector) + if node: + return node + return None + + +def extract_labeled_value(text: str, label: str) -> str: + if not text: + return "" + + pattern = re.compile( + rf"{re.escape(label)}:\s*(.+?)(?=\s+(?:[A-Z][a-z]+:)|\s{{2,}}|$)", + re.IGNORECASE, + ) + match = pattern.search(clean_text(text)) + if not match: + return "" + return clean_text(match.group(1)) + + +def normalize_department_category(value: str) -> str: + cleaned = clean_text(value) + if not cleaned: + return "" + if len(cleaned) > 80 or len(cleaned.split()) > 8: + return "" + if any(token in cleaned.lower() for token in ("p.o. box", "contact us", "@", "http://", "https://")): + return "" + return cleaned + + +def format_error_summary( + url: str, + exc: Exception, + response: requests.Response | None, + timeout_seconds: int, +) -> str: + if isinstance(exc, requests.HTTPError): + failing_response = exc.response or response + if failing_response is not None: + return ( + f"HTTP {failing_response.status_code} {failing_response.reason} " + f"while fetching {failing_response.url or url}" + ) + if isinstance(exc, requests.Timeout): + return f"Request timed out after {timeout_seconds}s while fetching {url}" + if isinstance(exc, requests.RequestException): + return f"{type(exc).__name__} while fetching {url}: {exc}" + return f"{type(exc).__name__}: {exc}" + + +def format_error_details( + url: str, + exc: Exception, + response: requests.Response | None, +) -> str: + details = [ + f"URL: {url}", + f"Error Type: {type(exc).__name__}", + f"Message: {exc}", + ] + + failing_response = getattr(exc, "response", None) or response + if failing_response is not None: + details.extend( + [ + f"HTTP Status: {failing_response.status_code} {failing_response.reason}", + f"Resolved URL: {failing_response.url}", + ] + ) + + trace = "".join(traceback.format_exception_only(type(exc), exc)).strip() + if trace: + details.append(f"Exception: {trace}") + + return "\n".join(details) diff --git a/Page Importer/page_importer/wxr.py b/Page Importer/page_importer/wxr.py new file mode 100644 index 0000000..0bb0c4f --- /dev/null +++ b/Page Importer/page_importer/wxr.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from email.utils import format_datetime +from io import StringIO +from xml.sax.saxutils import escape +import datetime as dt + +from page_importer.dates import parse_datetime +from page_importer.models import ScrapedPost + + +def build_wxr(posts: list[ScrapedPost], channel_title: str = "Imported Content") -> str: + now = dt.datetime.now(dt.timezone.utc) + out = StringIO() + out.write('\n') + out.write( + '\n' + ) + out.write("\n") + out.write(f"{escape(channel_title)}\n") + out.write("http://localhost/\n") + out.write("Generated by Page Importer\n") + out.write(f"{format_datetime(now)}\n") + out.write("en-US\n") + out.write("1.2\n") + + for post in posts: + local_date, gmt_date, item_pub_date = _resolve_post_dates(post.publish_date, now) + out.write("\n") + out.write(f"{escape(post.title)}\n") + out.write(f"{escape(post.source_url)}\n") + out.write(f"{format_datetime(item_pub_date)}\n") + out.write(f"{cdata(post.author or 'importer')}\n") + out.write(f"{escape(post.source_url)}\n") + out.write("\n") + out.write(f"{cdata(post.body_html)}\n") + out.write(f"{cdata('')}\n") + out.write(f"{cdata(local_date)}\n") + out.write(f"{cdata(gmt_date)}\n") + out.write("\n") + out.write("\n") + out.write("\n") + out.write(f"{cdata(post.status)}\n") + out.write("0\n") + out.write("0\n") + out.write(f"{cdata(post.post_type or 'post')}\n") + out.write("\n") + out.write("0\n") + for category in post.categories: + out.write( + f'{cdata(category)}\n' + ) + for tag in post.tags: + out.write( + f'{cdata(tag)}\n' + ) + out.write("\n") + + out.write("\n\n") + return out.getvalue() + + +def slugify(value: str) -> str: + return "".join(ch.lower() if ch.isalnum() else "-" for ch in value).strip("-") + + +def cdata(value: str) -> str: + return f"', ']]]]>')}]]>" + + +def _resolve_post_dates(value: str, fallback: dt.datetime) -> tuple[str, str, dt.datetime]: + parsed = parse_datetime(value) + if parsed is None: + return "", "", fallback + + if parsed.tzinfo is None or parsed.utcoffset() is None: + local_date = _format_wp_date(parsed) + assumed_utc = parsed.replace(tzinfo=dt.timezone.utc) + return local_date, local_date, assumed_utc + + local_date = _format_wp_date(parsed) + gmt_value = parsed.astimezone(dt.timezone.utc) + return local_date, _format_wp_date(gmt_value), gmt_value + + +def _format_wp_date(value: dt.datetime) -> str: + return value.replace(tzinfo=None).strftime("%Y-%m-%d %H:%M:%S") diff --git a/Page Importer/requirements.txt b/Page Importer/requirements.txt new file mode 100644 index 0000000..5ca142a --- /dev/null +++ b/Page Importer/requirements.txt @@ -0,0 +1,4 @@ +streamlit>=1.43,<2 +requests>=2.32,<3 +beautifulsoup4>=4.12,<5 +python-dateutil>=2.9,<3 diff --git a/Page Importer/tests/test_regressions.py b/Page Importer/tests/test_regressions.py new file mode 100644 index 0000000..aeab1a4 --- /dev/null +++ b/Page Importer/tests/test_regressions.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import unittest + +from bs4 import BeautifulSoup + +from page_importer.dates import normalize_date +from page_importer.scraper import CATEGORY_SELECTORS, TAG_SELECTORS, extract_terms, sanitize_html +from page_importer.wxr import build_wxr +from page_importer.models import ScrapedPost + + +class DateNormalizationTests(unittest.TestCase): + def test_preserves_timezone_offset_in_normalized_value(self) -> None: + self.assertEqual( + normalize_date("2024-05-01T09:30:00-07:00"), + "2024-05-01 09:30:00-07:00", + ) + + +class WxrSerializationTests(unittest.TestCase): + def test_writes_local_and_gmt_dates_from_offset_timestamp(self) -> None: + xml = build_wxr( + [ + ScrapedPost( + source_url="https://example.com/post", + title="Example", + body_html="

Body

", + publish_date="2024-05-01 09:30:00-07:00", + success=True, + ) + ] + ) + + self.assertIn("", xml) + self.assertIn("", xml) + self.assertIn("Wed, 01 May 2024 16:30:00 +0000", xml) + + def test_splits_cdata_terminators_in_content(self) -> None: + xml = build_wxr( + [ + ScrapedPost( + source_url="https://example.com/post", + title="Example", + body_html="

alpha ]]> omega

", + author="Jane ]]> Doe", + success=True, + ) + ] + ) + + self.assertIn("alpha ]]]]> omega", xml) + self.assertIn("Jane ]]]]> Doe", xml) + + +class HtmlSanitizationTests(unittest.TestCase): + def test_removes_inline_event_handlers_and_script_uris(self) -> None: + sanitized = sanitize_html( + '
x
' + ) + + self.assertNotIn("onclick", sanitized) + self.assertNotIn("onerror", sanitized) + self.assertNotIn("javascript:", sanitized) + + +class TaxonomySelectorTests(unittest.TestCase): + def test_drupal_tag_field_is_not_treated_as_category(self) -> None: + soup = BeautifulSoup( + '', + "html.parser", + ) + + self.assertEqual(extract_terms(soup, CATEGORY_SELECTORS), []) + self.assertEqual(extract_terms(soup, TAG_SELECTORS), ["Example Tag"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/README.md b/README.md new file mode 100644 index 0000000..e86978c --- /dev/null +++ b/README.md @@ -0,0 +1,110 @@ +# WDW Sitemap And Import Tools + +This repository combines two internal tools into one web application and one Docker image: + +- `Sitemap Generator` +- `Page Importer` + +The application uses Streamlit and presents both tools behind a single URL with two tabs at the top of the page. + +## What It Does + +### Sitemap Generator + +- Crawls a site from a starting URL +- Discovers URLs from page links and XML sitemaps +- Exports a sitemap CSV +- Saves crawl state and logs so a crawl can be resumed later + +### Page Importer + +- Reads a CSV of submitted URLs +- Scrapes page content +- Lets you review the extracted content +- Exports a WordPress WXR XML import file + +## Project Layout + +- `app.py`: top-level Streamlit app with both tabs +- `requirements.txt`: shared Python dependencies for the combined app +- `Dockerfile`: single image for the combined tool +- `.gitea/workflows/docker-image.yml`: Gitea Actions workflow for Docker builds +- `Sitemap Builder/`: sitemap crawler logic +- `Page Importer/`: WordPress import logic + +## Run Locally + +### Linux or macOS + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +streamlit run app.py +``` + +### Windows PowerShell + +```powershell +python -m venv .venv +.venv\Scripts\Activate.ps1 +pip install -r requirements.txt +streamlit run app.py +``` + +Then open: + +```text +http://localhost:8501 +``` + +## Docker + +Build the image: + +```bash +docker build -t wdw-sitemap-and-importer . +``` + +Run the container: + +```bash +docker run --rm -p 8501:8501 -v wdw-tools-data:/data wdw-sitemap-and-importer +``` + +Then open: + +```text +http://localhost:8501 +``` + +The mounted `/data` volume stores sitemap CSV files, crawl state files, and crawl logs so sitemap jobs can survive container restarts. + +## Gitea Automation + +The workflow file is: + +```text +.gitea/workflows/docker-image.yml +``` + +It runs on pushes to `main` and on manual workflow dispatch. + +The workflow always builds the Docker image. If these secrets are configured in Gitea, it also logs in and pushes the image to your registry: + +- `GITEA_REGISTRY_URL` +- `GITEA_REGISTRY_USERNAME` +- `GITEA_REGISTRY_PASSWORD` + +Published tags: + +- `${REGISTRY}/wdw-sitemap-and-importer:` +- `${REGISTRY}/wdw-sitemap-and-importer:latest` + +If the registry secrets are not configured, the workflow still performs the build as validation but skips the push steps. + +## Notes + +- Sitemap output files are written under `/data` in Docker. +- The sitemap crawler can resume previous runs when a matching crawl state file exists. +- The importer keeps its existing scraping and WordPress export behavior, but it now runs inside the shared interface instead of as a separate app. diff --git a/Sitemap Builder/README.md b/Sitemap Builder/README.md new file mode 100644 index 0000000..4d6f4a4 --- /dev/null +++ b/Sitemap Builder/README.md @@ -0,0 +1,80 @@ +# Sitemap Builder + +This folder contains the sitemap crawler used by the combined web application in the repository root. + +The crawler can still be used directly from Python, but the primary supported experience is now the shared Streamlit interface in the root project: + +```text +../app.py +``` + +## Current Role In The Combined App + +The root application uses this module to: + +- crawl a site from a submitted starting URL +- discover internal URLs from HTML links and XML sitemaps +- export a sitemap CSV +- save crawl state and crawl logs for resume support + +## Output + +The crawler writes: + +- a CSV file +- a sidecar crawl state file ending in `.crawlstate.json` +- a crawl log file ending in `.crawl.log` + +The CSV contains these columns: + +- `URL` +- `Title` +- `Canonical URL` +- `Type` + +## Standalone CLI Usage + +Interactive mode: + +```bash +python3 sitemap_builder.py +``` + +Command line mode: + +```bash +python3 sitemap_builder.py https://example.com -o ./sitemap.csv +``` + +On Windows: + +```powershell +python .\sitemap_builder.py https://example.com -o .\sitemap.csv +``` + +## Useful Options + +```bash +python3 sitemap_builder.py https://example.com --max-pages 20000 --delay 0.25 --include-subdomains +``` + +- `--max-pages`: stop after the given number of visited pages. Default: `10000` +- `--delay`: wait between requests to reduce load on the site +- `--timeout`: request timeout in seconds +- `--include-subdomains`: crawl subdomains of the starting host +- `--include-documents`: include document links such as PDF, CSV, DOC, DOCX, XLSX, and similar files +- `--workers`: number of worker threads to use. Set `1` to disable multithreading +- `--save-every`: save progress after every N pages. Default: `25` +- `--resume`: resume from an existing state file +- `--fresh`: ignore the existing state file and start over + +## Discovery And Behavior + +- The crawler checks `robots.txt` for sitemap references and also tries `/sitemap.xml` +- XML sitemap URLs are added to the crawl queue before page crawling begins +- HTML pages store page title and canonical URL in the CSV when available +- On Windows CLI runs, `P` pauses, `R` resumes, and `Q` stops cleanly and saves progress + +## Recommendation + +For normal use, run the root application or Docker container instead of calling this script directly. That is now the intended user interface for this repository. diff --git a/Sitemap Builder/sitemap_builder.py b/Sitemap Builder/sitemap_builder.py new file mode 100644 index 0000000..454993b --- /dev/null +++ b/Sitemap Builder/sitemap_builder.py @@ -0,0 +1,947 @@ +from __future__ import annotations + +import argparse +import csv +import json +import os +import sys +import time +import xml.etree.ElementTree as ET +from collections import deque +from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait +from dataclasses import dataclass +from html.parser import HTMLParser +from pathlib import Path +from typing import Iterable +from urllib.error import HTTPError, URLError +from urllib.parse import urljoin, urlsplit, urlunsplit +from urllib.request import Request, urlopen + +if os.name == "nt": + import msvcrt + + +DEFAULT_USER_AGENT = "SitemapBuilder/1.0 (+local script)" +DEFAULT_OUTPUT_NAME = "sitemap.csv" +DEFAULT_STATE_SUFFIX = ".crawlstate.json" +DEFAULT_LOG_SUFFIX = ".crawl.log" +DEFAULT_MAX_PAGES = 10000 +DEFAULT_RESUME_PAGE_INCREMENT = 10000 +DEFAULT_SAVE_EVERY = 25 +DEFAULT_WORKERS = 8 +SCRIPT_DIR = Path(__file__).resolve().parent +DOCUMENT_EXTENSIONS = { + ".pdf", + ".csv", + ".doc", + ".docx", + ".xls", + ".xlsx", + ".ppt", + ".pptx", + ".txt", + ".rtf", + ".zip", + ".xml", + ".json", +} + + +@dataclass +class CrawlResult: + url: str + links: list[str] + title: str = "" + canonical_url: str = "" + skipped: bool = False + error: str | None = None + + +@dataclass +class CrawlState: + start_url: str + include_subdomains: bool + include_documents: bool + visited: set[str] + queued: set[str] + queue: deque[str] + records: dict[str, dict[str, str]] + alias_to_canonical: dict[str, str] + errors: list[dict[str, str]] + skipped_count: int + discovered_from_sitemaps: int + + +@dataclass +class RuntimeControl: + paused: bool = False + stop_requested: bool = False + + +@dataclass +class CrawlRunResult: + state: CrawlState + user_stopped: bool + output_path: Path + state_path: Path + log_path: Path + max_pages: int + workers: int + + +class HTMLPageParser(HTMLParser): + def __init__(self) -> None: + super().__init__() + self.links: list[str] = [] + self.title_parts: list[str] = [] + self.in_title = False + self.canonical_href = "" + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + attrs_map = {key.lower(): value for key, value in attrs} + lower_tag = tag.lower() + + if lower_tag == "a": + href = attrs_map.get("href") + if href: + self.links.append(href) + + if lower_tag == "title": + self.in_title = True + + if lower_tag == "link": + rel = (attrs_map.get("rel") or "").lower() + href = attrs_map.get("href") or "" + if "canonical" in rel and href: + self.canonical_href = href + + def handle_endtag(self, tag: str) -> None: + if tag.lower() == "title": + self.in_title = False + + def handle_data(self, data: str) -> None: + if self.in_title: + self.title_parts.append(data) + + @property + def title(self) -> str: + return " ".join(part.strip() for part in self.title_parts if part.strip()).strip() + + +def normalize_url(url: str) -> str: + parts = urlsplit(url.strip()) + scheme = parts.scheme.lower() or "https" + netloc = parts.netloc.lower() + path = parts.path or "/" + + if path != "/" and path.endswith("/"): + path = path.rstrip("/") + + return urlunsplit((scheme, netloc, path, parts.query, "")) + + +def is_http_url(url: str) -> bool: + return urlsplit(url).scheme in {"http", "https"} + + +def build_allowed_hosts(start_url: str) -> set[str]: + return {urlsplit(start_url).netloc.lower()} + + +def should_visit(url: str, allowed_hosts: set[str], include_subdomains: bool) -> bool: + if not is_http_url(url): + return False + + host = urlsplit(url).netloc.lower() + if include_subdomains: + return any(host == allowed or host.endswith(f".{allowed}") for allowed in allowed_hosts) + return host in allowed_hosts + + +def is_document_url(url: str) -> bool: + return Path(urlsplit(url).path).suffix.lower() in DOCUMENT_EXTENSIONS + + +def should_record_url(url: str) -> bool: + query = urlsplit(url).query.lower() + return query != "page=1" + + +def get_state_path(output_path: Path) -> Path: + return output_path.with_suffix(output_path.suffix + DEFAULT_STATE_SUFFIX) + + +def get_log_path(output_path: Path) -> Path: + return output_path.with_suffix(output_path.suffix + DEFAULT_LOG_SUFFIX) + + +def log_message(log_path: Path, message: str) -> None: + log_path.parent.mkdir(parents=True, exist_ok=True) + timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + with log_path.open("a", encoding="utf-8") as log_file: + log_file.write(f"[{timestamp}] {message}\n") + + +def resolve_alias(url: str, alias_to_canonical: dict[str, str]) -> str: + resolved = url + seen: set[str] = set() + while resolved in alias_to_canonical and resolved not in seen: + seen.add(resolved) + resolved = alias_to_canonical[resolved] + return resolved + + +def register_record( + state: CrawlState, + url: str, + record_type: str, + title: str = "", + canonical_url: str = "", +) -> None: + if not should_record_url(url): + return + + existing = state.records.get(url, {"title": "", "canonical_url": "", "type": record_type}) + if not existing.get("type"): + existing["type"] = record_type + elif existing["type"] == "document" and record_type == "page": + existing["type"] = "page" + + if title and not existing.get("title"): + existing["title"] = title + if canonical_url and not existing.get("canonical_url"): + existing["canonical_url"] = canonical_url + if "canonical_url" not in existing: + existing["canonical_url"] = canonical_url + if "title" not in existing: + existing["title"] = title + state.records[url] = existing + + +def save_state(state: CrawlState, state_path: Path, output_path: Path) -> None: + state_path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "start_url": state.start_url, + "include_subdomains": state.include_subdomains, + "include_documents": state.include_documents, + "visited": sorted(state.visited), + "queued": sorted(state.queued), + "queue": list(state.queue), + "records": state.records, + "alias_to_canonical": state.alias_to_canonical, + "errors": state.errors, + "skipped_count": state.skipped_count, + "discovered_from_sitemaps": state.discovered_from_sitemaps, + "saved_at": time.strftime("%Y-%m-%d %H:%M:%S"), + "output_path": str(output_path), + } + state_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +def load_state(state_path: Path) -> CrawlState: + payload = json.loads(state_path.read_text(encoding="utf-8")) + return CrawlState( + start_url=payload["start_url"], + include_subdomains=bool(payload.get("include_subdomains", False)), + include_documents=bool(payload.get("include_documents", False)), + visited=set(payload.get("visited", [])), + queued=set(payload.get("queued", [])), + queue=deque(payload.get("queue", [])), + records=dict(payload.get("records", {})), + alias_to_canonical=dict(payload.get("alias_to_canonical", {})), + errors=list(payload.get("errors", [])), + skipped_count=int(payload.get("skipped_count", 0)), + discovered_from_sitemaps=int(payload.get("discovered_from_sitemaps", 0)), + ) + + +def initialize_state(start_url: str, include_subdomains: bool, include_documents: bool) -> CrawlState: + normalized_start = normalize_url(start_url) + return CrawlState( + start_url=normalized_start, + include_subdomains=include_subdomains, + include_documents=include_documents, + visited=set(), + queued={normalized_start}, + queue=deque([normalized_start]), + records={}, + alias_to_canonical={}, + errors=[], + skipped_count=0, + discovered_from_sitemaps=0, + ) + + +def prompt_if_missing(value: str | None, prompt_text: str) -> str: + if value: + return value + return input(prompt_text).strip() + + +def prompt_yes_no(prompt_text: str, default: bool) -> bool: + suffix = "Y/n" if default else "y/N" + answer = input(f"{prompt_text} [{suffix}]: ").strip().lower() + if not answer: + return default + return answer in {"y", "yes"} + + +def write_csv(records: dict[str, dict[str, str]], output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", newline="", encoding="utf-8") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(["URL", "Title", "Canonical URL", "Type"]) + for url in sorted(records): + record = records[url] + writer.writerow( + [ + url, + record.get("title", ""), + record.get("canonical_url", ""), + record.get("type", ""), + ] + ) + + +def fetch_text(url: str, timeout: float, user_agent: str, accept: str) -> tuple[str | None, str | None]: + request = Request(url, headers={"User-Agent": user_agent, "Accept": accept}) + try: + with urlopen(request, timeout=timeout) as response: + return ( + response.read().decode(response.headers.get_content_charset() or "utf-8", errors="replace"), + None, + ) + except HTTPError as exc: + return None, f"HTTP {exc.code}" + except URLError as exc: + return None, str(exc.reason) + except TimeoutError: + return None, "request timed out" + except Exception as exc: # pragma: no cover + return None, str(exc) + + +def fetch_page(url: str, timeout: float, user_agent: str) -> CrawlResult: + request = Request( + url, + headers={ + "User-Agent": user_agent, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + }, + ) + + try: + with urlopen(request, timeout=timeout) as response: + content_type = response.headers.get("Content-Type", "").lower() + if "text/html" not in content_type and "application/xhtml+xml" not in content_type: + return CrawlResult(url=url, links=[], skipped=True) + + content = response.read().decode(response.headers.get_content_charset() or "utf-8", errors="replace") + except HTTPError as exc: + return CrawlResult(url=url, links=[], error=f"HTTP {exc.code}") + except URLError as exc: + return CrawlResult(url=url, links=[], error=str(exc.reason)) + except TimeoutError: + return CrawlResult(url=url, links=[], error="request timed out") + except Exception as exc: # pragma: no cover + return CrawlResult(url=url, links=[], error=str(exc)) + + parser = HTMLPageParser() + parser.feed(content) + canonical_url = normalize_url(urljoin(url, parser.canonical_href)) if parser.canonical_href else "" + return CrawlResult( + url=url, + links=parser.links, + title=parser.title, + canonical_url=canonical_url, + ) + + +def fetch_page_with_delay(url: str, timeout: float, user_agent: str, delay: float) -> CrawlResult: + if delay > 0: + time.sleep(delay) + return fetch_page(url, timeout=timeout, user_agent=user_agent) + + +def print_progress(state: CrawlState, max_pages: int, current_url: str) -> None: + print( + f"[{len(state.visited)}/{max_pages}] Found {len(state.records)} URL(s), " + f"queued {len(state.queue)} more: {current_url}" + ) + + +def poll_runtime_control(control: RuntimeControl, log_path: Path) -> None: + if os.name != "nt": + return + + while msvcrt.kbhit(): + key = msvcrt.getwch().lower() + if key == "p" and not control.paused: + control.paused = True + print("Paused. Press R to resume or Q to stop.") + log_message(log_path, "Crawl paused by user") + elif key == "r" and control.paused: + control.paused = False + print("Resuming crawl.") + log_message(log_path, "Crawl resumed by user") + elif key == "q": + control.stop_requested = True + log_message(log_path, "Stop requested by user") + + +def discover_robots_sitemaps( + start_url: str, + timeout: float, + user_agent: str, + log_path: Path, +) -> set[str]: + robots_url = normalize_url(urljoin(start_url, "/robots.txt")) + content, error = fetch_text(robots_url, timeout, user_agent, "text/plain,*/*;q=0.8") + if error: + log_message(log_path, f"robots.txt not available at {robots_url}: {error}") + return set() + + sitemap_urls: set[str] = set() + for line in content.splitlines(): + if line.lower().startswith("sitemap:"): + raw_url = line.split(":", 1)[1].strip() + if raw_url: + sitemap_urls.add(normalize_url(raw_url)) + + if sitemap_urls: + log_message(log_path, f"Discovered {len(sitemap_urls)} sitemap reference(s) from robots.txt") + return sitemap_urls + + +def xml_local_name(tag: str) -> str: + if "}" in tag: + return tag.rsplit("}", 1)[1] + return tag + + +def parse_sitemap_urls( + sitemap_url: str, + allowed_hosts: set[str], + include_subdomains: bool, + timeout: float, + user_agent: str, + log_path: Path, + seen_sitemaps: set[str], +) -> set[str]: + normalized_sitemap = normalize_url(sitemap_url) + if normalized_sitemap in seen_sitemaps: + return set() + seen_sitemaps.add(normalized_sitemap) + + if not should_visit(normalized_sitemap, allowed_hosts, include_subdomains): + return set() + + content, error = fetch_text(normalized_sitemap, timeout, user_agent, "application/xml,text/xml;q=0.9,*/*;q=0.8") + if error: + log_message(log_path, f"Sitemap fetch failed for {normalized_sitemap}: {error}") + return set() + + try: + root = ET.fromstring(content) + except ET.ParseError as exc: + log_message(log_path, f"Sitemap parse failed for {normalized_sitemap}: {exc}") + return set() + + tag_name = xml_local_name(root.tag) + discovered_urls: set[str] = set() + + if tag_name == "urlset": + for element in root.findall(".//"): + if xml_local_name(element.tag) == "loc" and element.text: + normalized = normalize_url(element.text.strip()) + if should_visit(normalized, allowed_hosts, include_subdomains): + discovered_urls.add(normalized) + elif tag_name == "sitemapindex": + for element in root.findall(".//"): + if xml_local_name(element.tag) == "loc" and element.text: + child_sitemap = normalize_url(element.text.strip()) + discovered_urls.update( + parse_sitemap_urls( + child_sitemap, + allowed_hosts, + include_subdomains, + timeout, + user_agent, + log_path, + seen_sitemaps, + ) + ) + else: + log_message(log_path, f"Unsupported sitemap format at {normalized_sitemap}") + + return discovered_urls + + +def seed_from_xml_sitemaps( + state: CrawlState, + timeout: float, + user_agent: str, + log_path: Path, +) -> None: + allowed_hosts = build_allowed_hosts(state.start_url) + sitemap_candidates = discover_robots_sitemaps(state.start_url, timeout, user_agent, log_path) + sitemap_candidates.add(normalize_url(urljoin(state.start_url, "/sitemap.xml"))) + + seen_sitemaps: set[str] = set() + discovered_urls: set[str] = set() + for sitemap_url in sitemap_candidates: + discovered_urls.update( + parse_sitemap_urls( + sitemap_url, + allowed_hosts, + state.include_subdomains, + timeout, + user_agent, + log_path, + seen_sitemaps, + ) + ) + + added = 0 + for url in discovered_urls: + canonical_url = resolve_alias(url, state.alias_to_canonical) + if is_document_url(canonical_url): + if state.include_documents: + register_record(state, canonical_url, "document") + added += 1 + continue + + register_record(state, canonical_url, "page") + if canonical_url not in state.visited and canonical_url not in state.queued: + state.queue.append(canonical_url) + state.queued.add(canonical_url) + added += 1 + + state.discovered_from_sitemaps += added + log_message(log_path, f"Added {added} URL(s) from XML sitemap discovery") + + +def process_crawl_result( + state: CrawlState, + result: CrawlResult, + allowed_hosts: set[str], + log_path: Path, +) -> None: + if result.error: + state.errors.append({"url": result.url, "error": result.error}) + log_message(log_path, f"Error fetching {result.url}: {result.error}") + return + + if result.skipped: + state.skipped_count += 1 + register_record(state, result.url, "document") + return + + canonical_url = "" + if result.canonical_url and should_visit(result.canonical_url, allowed_hosts, state.include_subdomains): + canonical_url = resolve_alias(result.canonical_url, state.alias_to_canonical) + state.alias_to_canonical[result.url] = canonical_url + register_record(state, canonical_url, "page", title=result.title, canonical_url=canonical_url) + if canonical_url not in state.visited and canonical_url not in state.queued: + state.queue.append(canonical_url) + state.queued.add(canonical_url) + register_record(state, result.url, "page", title=result.title, canonical_url=canonical_url) + + for raw_link in result.links: + absolute = normalize_url(urljoin(result.url, raw_link)) + if not should_visit(absolute, allowed_hosts, state.include_subdomains): + continue + + absolute = resolve_alias(absolute, state.alias_to_canonical) + if is_document_url(absolute): + if state.include_documents: + register_record(state, absolute, "document") + continue + + register_record(state, absolute, "page") + if absolute not in state.queued and absolute not in state.visited: + state.queue.append(absolute) + state.queued.add(absolute) + + +def crawl_site( + state: CrawlState, + max_pages: int, + delay: float, + timeout: float, + user_agent: str, + state_path: Path, + output_path: Path, + log_path: Path, + save_every: int, + workers: int, +) -> tuple[CrawlState, bool]: + allowed_hosts = build_allowed_hosts(state.start_url) + processed_since_save = 0 + user_stopped = False + control = RuntimeControl() + + if workers <= 1: + while state.queue and len(state.visited) < max_pages: + poll_runtime_control(control, log_path) + if control.stop_requested: + user_stopped = True + print("Stop requested. Saving progress and finishing cleanly...") + break + + while control.paused and not control.stop_requested: + time.sleep(0.2) + poll_runtime_control(control, log_path) + + if control.stop_requested: + user_stopped = True + print("Stop requested. Saving progress and finishing cleanly...") + break + + current = resolve_alias(state.queue.popleft(), state.alias_to_canonical) + if current in state.visited: + continue + + state.visited.add(current) + register_record(state, current, "page") + print_progress(state, max_pages, current) + + result = fetch_page_with_delay(current, timeout=timeout, user_agent=user_agent, delay=delay) + process_crawl_result(state, result, allowed_hosts, log_path) + + processed_since_save += 1 + if processed_since_save >= save_every: + write_csv(state.records, output_path) + save_state(state, state_path, output_path) + log_message(log_path, f"Saved progress after {len(state.visited)} visited page(s)") + processed_since_save = 0 + else: + with ThreadPoolExecutor(max_workers=workers) as executor: + pending: dict[object, str] = {} + + while pending or (state.queue and len(state.visited) < max_pages): + poll_runtime_control(control, log_path) + + if control.stop_requested: + user_stopped = True + print("Stop requested. No new pages will be queued. Waiting for active requests to finish...") + break + + if control.paused: + if pending: + completed, _ = wait(pending.keys(), timeout=0.2, return_when=FIRST_COMPLETED) + for future in completed: + pending.pop(future, None) + result = future.result() + process_crawl_result(state, result, allowed_hosts, log_path) + processed_since_save += 1 + else: + time.sleep(0.2) + + if processed_since_save >= save_every: + write_csv(state.records, output_path) + save_state(state, state_path, output_path) + log_message(log_path, f"Saved progress after {len(state.visited)} visited page(s)") + processed_since_save = 0 + continue + + while state.queue and len(pending) < workers and len(state.visited) < max_pages: + current = resolve_alias(state.queue.popleft(), state.alias_to_canonical) + if current in state.visited: + continue + + state.visited.add(current) + register_record(state, current, "page") + print_progress(state, max_pages, current) + future = executor.submit(fetch_page_with_delay, current, timeout, user_agent, delay) + pending[future] = current + + if not pending: + continue + + completed, _ = wait(pending.keys(), timeout=0.2, return_when=FIRST_COMPLETED) + for future in completed: + pending.pop(future, None) + result = future.result() + process_crawl_result(state, result, allowed_hosts, log_path) + processed_since_save += 1 + + if processed_since_save >= save_every: + write_csv(state.records, output_path) + save_state(state, state_path, output_path) + log_message(log_path, f"Saved progress after {len(state.visited)} visited page(s)") + processed_since_save = 0 + + if user_stopped and pending: + completed, _ = wait(pending.keys()) + for future in completed: + pending.pop(future, None) + result = future.result() + process_crawl_result(state, result, allowed_hosts, log_path) + + write_csv(state.records, output_path) + save_state(state, state_path, output_path) + log_message(log_path, f"Final save completed with {len(state.records)} URL(s) recorded") + return state, user_stopped + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Crawl a website and export discovered internal URLs to a CSV sitemap.", + ) + parser.add_argument("url", nargs="?", help="Starting URL to crawl, for example https://example.com") + parser.add_argument( + "-o", + "--output", + help=f"Output CSV path. Defaults to {DEFAULT_OUTPUT_NAME} in the script folder.", + ) + parser.add_argument( + "--max-pages", + type=int, + default=DEFAULT_MAX_PAGES, + help=f"Maximum number of pages to crawl before stopping. Default: {DEFAULT_MAX_PAGES}", + ) + parser.add_argument( + "--delay", + type=float, + default=0.0, + help="Delay in seconds between requests. Default: 0", + ) + parser.add_argument( + "--timeout", + type=float, + default=15.0, + help="Request timeout in seconds. Default: 15", + ) + parser.add_argument( + "--include-subdomains", + action="store_true", + help="Also crawl subdomains of the starting host.", + ) + parser.add_argument( + "--include-documents", + action="store_true", + help="Include document links like PDF, CSV, DOC, and DOCX in the sitemap output.", + ) + parser.add_argument( + "--save-every", + type=int, + default=DEFAULT_SAVE_EVERY, + help=f"Save progress after this many pages. Default: {DEFAULT_SAVE_EVERY}", + ) + parser.add_argument( + "--resume", + action="store_true", + help="Resume from the saved crawl state if a state file already exists.", + ) + parser.add_argument( + "--fresh", + action="store_true", + help="Ignore any saved crawl state and start over.", + ) + parser.add_argument( + "--workers", + type=int, + default=0, + help=f"Number of worker threads. Use 1 to disable multithreading. Default when prompted on: {DEFAULT_WORKERS}", + ) + return parser.parse_args() + + +def run_crawl( + *, + start_url: str, + output_path: Path, + max_pages: int = DEFAULT_MAX_PAGES, + delay: float = 0.0, + timeout: float = 15.0, + include_subdomains: bool = False, + include_documents: bool = False, + save_every: int = DEFAULT_SAVE_EVERY, + workers: int = DEFAULT_WORKERS, + resume: bool = True, + fresh: bool = False, + user_agent: str = DEFAULT_USER_AGENT, +) -> CrawlRunResult: + if not start_url: + raise ValueError("A starting URL is required.") + + if "://" not in start_url: + start_url = f"https://{start_url}" + + normalized_start = normalize_url(start_url) + if not is_http_url(normalized_start): + raise ValueError("Only http and https URLs are supported.") + + output_path = Path(output_path) + state_path = get_state_path(output_path) + log_path = get_log_path(output_path) + + state: CrawlState + if state_path.exists() and not fresh and resume: + state = load_state(state_path) + if state.start_url != normalized_start: + raise ValueError( + "The saved crawl state belongs to a different starting URL. " + "Use a different output name or start a fresh crawl." + ) + if state.include_documents != include_documents: + raise ValueError( + "The saved crawl state uses a different document setting. " + "Keep the same choice or start a fresh crawl." + ) + else: + state = initialize_state(normalized_start, include_subdomains, include_documents) + + effective_workers = max(int(workers), 1) + effective_max_pages = max(int(max_pages), 1) + if state.visited: + effective_max_pages = max(effective_max_pages, len(state.visited) + DEFAULT_RESUME_PAGE_INCREMENT) + else: + seed_from_xml_sitemaps(state, max(timeout, 1.0), user_agent, log_path) + + log_message(log_path, f"Starting crawl for {state.start_url}") + log_message(log_path, f"Output CSV: {output_path.resolve()}") + log_message(log_path, f"State file: {state_path.resolve()}") + log_message(log_path, f"Multithreading workers: {effective_workers}") + log_message(log_path, f"Include documents: {state.include_documents}") + + state, user_stopped = crawl_site( + state=state, + max_pages=effective_max_pages, + delay=max(delay, 0.0), + timeout=max(timeout, 1.0), + user_agent=user_agent, + state_path=state_path, + output_path=output_path, + log_path=log_path, + save_every=max(save_every, 1), + workers=effective_workers, + ) + + if user_stopped: + log_message(log_path, "Crawl stopped by user") + elif state.queue and len(state.visited) >= effective_max_pages: + log_message(log_path, "Crawl stopped at max page limit") + elif state.queue: + log_message(log_path, "Crawl stopped before queue emptied") + else: + log_message(log_path, "Crawl completed with empty queue") + + return CrawlRunResult( + state=state, + user_stopped=user_stopped, + output_path=output_path, + state_path=state_path, + log_path=log_path, + max_pages=effective_max_pages, + workers=effective_workers, + ) + + +def main() -> int: + args = parse_args() + + start_url = prompt_if_missing(args.url, "Enter the website URL to crawl: ") + if not start_url: + print("A starting URL is required.", file=sys.stderr) + return 1 + + if "://" not in start_url: + start_url = f"https://{start_url}" + + normalized_start = normalize_url(start_url) + if not is_http_url(normalized_start): + print("Only http and https URLs are supported.", file=sys.stderr) + return 1 + + output_value = prompt_if_missing(args.output, f"Enter output CSV path [{DEFAULT_OUTPUT_NAME}]: ") + output_path = Path(output_value) if output_value else SCRIPT_DIR / DEFAULT_OUTPUT_NAME + state_path = get_state_path(output_path) + log_path = get_log_path(output_path) + include_documents = args.include_documents or prompt_yes_no( + "Include document links such as PDF, CSV, DOC, and DOCX in the sitemap?", + default=False, + ) + workers = args.workers + if workers <= 0: + enable_multithreading = prompt_yes_no( + f"Enable multithreading for faster scanning? {DEFAULT_WORKERS} worker threads will be used.", + default=True, + ) + workers = DEFAULT_WORKERS if enable_multithreading else 1 + + print(f"Crawling {normalized_start}") + print(f"Output file: {output_path.resolve()}") + print(f"State file: {state_path.resolve()}") + print(f"Log file: {log_path.resolve()}") + resume_existing = False + if state_path.exists() and not args.fresh: + resume_existing = args.resume or prompt_yes_no( + f"Found saved crawl state at {state_path.name}. Resume from where it left off?", + default=True, + ) + + try: + run_result = run_crawl( + start_url=normalized_start, + output_path=output_path, + max_pages=args.max_pages, + delay=args.delay, + timeout=args.timeout, + include_subdomains=args.include_subdomains, + include_documents=include_documents, + save_every=args.save_every, + workers=workers, + resume=resume_existing, + fresh=args.fresh, + user_agent=DEFAULT_USER_AGENT, + ) + except ValueError as exc: + print(str(exc), file=sys.stderr) + return 1 + + state = run_result.state + user_stopped = run_result.user_stopped + effective_max_pages = run_result.max_pages + + print(f"Max pages: {effective_max_pages}") + print(f"Include documents: {'Yes' if state.include_documents else 'No'}") + print(f"Multithreading: {'Yes' if run_result.workers > 1 else 'No'}") + print(f"Worker threads: {run_result.workers}") + if os.name == "nt": + print("Press P to pause, R to resume, or Q to stop cleanly and save progress.") + if resume_existing: + print("Resumed from the existing crawl state file.") + log_message(log_path, "Resumed from existing crawl state") + + print(f"Found {len(state.records)} unique URL(s).") + print(f"Visited pages: {len(state.visited)}") + print(f"Queued pages remaining: {len(state.queue)}") + print(f"URLs added from XML sitemaps: {state.discovered_from_sitemaps}") + if state.errors: + print(f"Pages with errors: {len(state.errors)}") + for result in state.errors[:10]: + print(f" {result['url']} -> {result['error']}") + if state.skipped_count: + print(f"Non-HTML pages skipped while crawling: {state.skipped_count}") + + if user_stopped: + print("Stopped by user. Run it again to continue from the saved state.") + log_message(log_path, "Crawl stopped by user") + elif state.queue and len(state.visited) >= effective_max_pages: + print("Stopped because the max page limit was reached. Run it again to continue.") + log_message(log_path, "Crawl stopped at max page limit") + elif state.queue: + print("Stopped before the queue was empty. Run it again to continue.") + log_message(log_path, "Crawl stopped before queue emptied") + else: + print("Crawl complete. No queued pages remain.") + log_message(log_path, "Crawl completed with empty queue") + + print("Done.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/app.py b/app.py new file mode 100644 index 0000000..9c310a2 --- /dev/null +++ b/app.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import contextlib +import csv +import importlib.util +import io +import os +import re +import sys +from pathlib import Path + +import streamlit as st + + +ROOT_DIR = Path(__file__).resolve().parent +PAGE_IMPORTER_DIR = ROOT_DIR / "Page Importer" +SITEMAP_BUILDER_PATH = ROOT_DIR / "Sitemap Builder" / "sitemap_builder.py" +APP_DATA_DIR = Path(os.environ.get("APP_DATA_DIR", ROOT_DIR / ".data")).resolve() +SITEMAP_OUTPUT_DIR = APP_DATA_DIR / "sitemaps" + + +def load_module(module_name: str, file_path: Path): + spec = importlib.util.spec_from_file_location(module_name, file_path) + if spec is None or spec.loader is None: + raise RuntimeError(f"Unable to load module from {file_path}") + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + + +def get_page_importer_module(): + if str(PAGE_IMPORTER_DIR) not in sys.path: + sys.path.insert(0, str(PAGE_IMPORTER_DIR)) + return load_module("page_importer_streamlit", PAGE_IMPORTER_DIR / "app.py") + + +def get_sitemap_module(): + return load_module("sitemap_builder_module", SITEMAP_BUILDER_PATH) + + +def sanitize_job_name(value: str) -> str: + cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", (value or "").strip()) + cleaned = cleaned.strip(".-") + return cleaned or "sitemap" + + +def read_csv_preview(csv_bytes: bytes, limit: int = 200) -> list[dict[str, str]]: + text = csv_bytes.decode("utf-8-sig", errors="replace") + reader = csv.DictReader(io.StringIO(text)) + rows: list[dict[str, str]] = [] + for index, row in enumerate(reader): + if index >= limit: + break + rows.append(dict(row)) + return rows + + +def render_sitemap_tab() -> None: + st.title("Sitemap Generator") + st.caption("Crawl a site, export a sitemap CSV, and keep resume data inside the container data volume.") + + SITEMAP_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + + with st.form("sitemap-form"): + start_url = st.text_input("Starting URL", placeholder="https://example.com") + job_name = st.text_input( + "Output name", + value="sitemap", + help="Used for the CSV, crawl state, and log file names.", + ) + + col1, col2, col3 = st.columns(3) + with col1: + max_pages = st.number_input("Max pages", min_value=1, value=10000, step=100) + workers = st.number_input("Worker threads", min_value=1, value=8, step=1) + with col2: + delay = st.number_input("Delay between requests (seconds)", min_value=0.0, value=0.0, step=0.25) + timeout = st.number_input("Request timeout (seconds)", min_value=1.0, value=15.0, step=1.0) + with col3: + save_every = st.number_input("Save progress every N pages", min_value=1, value=25, step=1) + include_subdomains = st.checkbox("Include subdomains", value=False) + include_documents = st.checkbox("Include document links", value=False) + + resume_existing = st.checkbox("Resume from saved crawl state if present", value=True) + start_fresh = st.checkbox("Ignore any saved crawl state and start fresh", value=False) + submitted = st.form_submit_button("Run Sitemap Crawl", type="primary") + + if submitted: + if not start_url.strip(): + st.error("Starting URL is required.") + else: + sitemap_builder = get_sitemap_module() + safe_name = sanitize_job_name(job_name) + output_path = SITEMAP_OUTPUT_DIR / f"{safe_name}.csv" + captured_stdout = io.StringIO() + + try: + with st.spinner("Running sitemap crawl..."): + with contextlib.redirect_stdout(captured_stdout): + result = sitemap_builder.run_crawl( + start_url=start_url, + output_path=output_path, + max_pages=int(max_pages), + delay=float(delay), + timeout=float(timeout), + include_subdomains=include_subdomains, + include_documents=include_documents, + save_every=int(save_every), + workers=int(workers), + resume=resume_existing, + fresh=start_fresh, + ) + except Exception as exc: + st.error(str(exc)) + else: + st.session_state["sitemap_result"] = { + "summary": { + "records": len(result.state.records), + "visited": len(result.state.visited), + "queued": len(result.state.queue), + "errors": len(result.state.errors), + "skipped": result.state.skipped_count, + "from_sitemaps": result.state.discovered_from_sitemaps, + "user_stopped": result.user_stopped, + "max_pages": result.max_pages, + "workers": result.workers, + }, + "output_path": str(result.output_path), + "state_path": str(result.state_path), + "log_path": str(result.log_path), + "stdout": captured_stdout.getvalue(), + } + + result_data = st.session_state.get("sitemap_result") + if not result_data: + st.info("Run a crawl to generate a sitemap CSV.") + return + + summary = result_data["summary"] + csv_path = Path(result_data["output_path"]) + state_path = Path(result_data["state_path"]) + log_path = Path(result_data["log_path"]) + + st.subheader("Crawl Summary") + metric_cols = st.columns(6) + metric_cols[0].metric("URLs Found", summary["records"]) + metric_cols[1].metric("Visited", summary["visited"]) + metric_cols[2].metric("Queued", summary["queued"]) + metric_cols[3].metric("XML Seeded", summary["from_sitemaps"]) + metric_cols[4].metric("Errors", summary["errors"]) + metric_cols[5].metric("Skipped", summary["skipped"]) + + status_text = "Stopped by user." if summary["user_stopped"] else "Run completed." + st.caption(f"{status_text} Max pages used: {summary['max_pages']} | Worker threads: {summary['workers']}") + + if csv_path.exists(): + csv_bytes = csv_path.read_bytes() + st.download_button( + "Download Sitemap CSV", + data=csv_bytes, + file_name=csv_path.name, + mime="text/csv", + ) + preview_rows = read_csv_preview(csv_bytes) + if preview_rows: + st.dataframe(preview_rows, width="stretch", hide_index=True) + + file_cols = st.columns(2) + with file_cols[0]: + if state_path.exists(): + st.download_button( + "Download Crawl State", + data=state_path.read_bytes(), + file_name=state_path.name, + mime="application/json", + ) + with file_cols[1]: + if log_path.exists(): + st.download_button( + "Download Crawl Log", + data=log_path.read_bytes(), + file_name=log_path.name, + mime="text/plain", + ) + + crawl_output = (result_data.get("stdout") or "").strip() + if crawl_output: + st.text_area("Crawler Output", value=crawl_output, height=220, disabled=True) + + if log_path.exists(): + log_text = log_path.read_text(encoding="utf-8", errors="replace") + st.text_area("Log Tail", value="\n".join(log_text.splitlines()[-50:]), height=220, disabled=True) + + +def main() -> None: + st.set_page_config(page_title="WDW Tools", layout="wide") + st.header("WDW Sitemap And Import Tools") + sitemap_tab, importer_tab = st.tabs(["Sitemap Generator", "Page Importer"]) + + with sitemap_tab: + render_sitemap_tab() + + with importer_tab: + page_importer_app = get_page_importer_module() + page_importer_app.render_app() + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5ca142a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +streamlit>=1.43,<2 +requests>=2.32,<3 +beautifulsoup4>=4.12,<5 +python-dateutil>=2.9,<3