first commit
Build Docker Image / docker (push) Successful in 44s

This commit is contained in:
2026-04-09 10:42:10 -07:00
commit ead872a0a5
19 changed files with 2783 additions and 0 deletions
+12
View File
@@ -0,0 +1,12 @@
.venv/
__pycache__/
*.py[cod]
*$py.class
.pytest_cache/
.mypy_cache/
.ruff_cache/
.streamlit/secrets.toml
*.log
+63
View File
@@ -0,0 +1,63 @@
# Page Importer
This folder contains the WordPress import tool used by the combined application in the repository root.
The importer still uses Streamlit internally, but it is now rendered as the `Page Importer` tab inside the shared app rather than being the main entrypoint for the repository.
## Features
- Upload a CSV of submitted URLs
- Choose the URL column and optional title override column
- Optionally map post type from the CSV or force a single post type
- Scrape only the listed URLs
- Extract title, publish date, author, body HTML, categories, and tags
- Retry failed rows
- Export a WordPress WXR XML file
## Recommended Usage
Run the root application:
```bash
streamlit run ../app.py
```
Or run the combined Docker container from the repository root.
## Standalone Usage
If you need to run this importer by itself:
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
streamlit run app.py
```
On Windows PowerShell:
```powershell
python -m venv .venv
.venv\Scripts\Activate.ps1
pip install -r requirements.txt
streamlit run app.py
```
## CSV Input
The app accepts CSV files with any columns. You choose:
- the URL column to scrape
- an optional title or name column to override the scraped title
- an optional post type column with values like `post` or `page`
- an optional category column whose values are appended during export
You can also add manual categories in the sidebar to append them to every exported item.
## Notes
- Exported posts default to `draft` unless changed in the UI
- Image and link URLs remain pointed at the source site
- Some themes need heuristic fallback. The `Force heuristic scraping` option skips JSON-LD-first extraction and relies on page structure
- In the combined app, dependencies come from the root `requirements.txt`
+475
View File
@@ -0,0 +1,475 @@
from __future__ import annotations
import csv
import datetime as dt
import io
import re
from dataclasses import replace
import streamlit as st
from page_importer.dates import parse_datetime
from page_importer.models import ScrapeOptions, ScrapedPost
from page_importer.scraper import Scraper
from page_importer.wxr import build_wxr
def load_csv(file_data: bytes) -> tuple[list[str], list[dict[str, str]]]:
text = file_data.decode("utf-8-sig", errors="replace")
reader = csv.DictReader(io.StringIO(text))
rows = list(reader)
return reader.fieldnames or [], rows
def render_app() -> None:
st.title("Page Importer")
st.caption("Scrape blog posts from CSV URLs and export a WordPress WXR file.")
with st.sidebar:
st.header("Options")
include_author = st.checkbox("Include author", value=True)
include_categories = st.checkbox("Include categories", value=True)
include_tags = st.checkbox("Include tags", value=True)
force_heuristics = st.checkbox("Force heuristic scraping", value=False)
test_run = st.checkbox(
"Test run only",
value=False,
help="Scrape only the first 10 rows that contain a URL.",
)
post_type_mode = st.selectbox(
"WordPress post type mode",
["Single type for all rows", "Use a CSV column"],
index=0,
)
default_post_type = st.selectbox("Default WordPress post type", ["post", "page"], index=0)
uploaded = st.file_uploader("Upload CSV", type=["csv"])
if not uploaded:
st.info("Upload a CSV to begin.")
return
headers, rows = load_csv(uploaded.getvalue())
if not rows:
st.error("The CSV did not contain any rows.")
return
col1, col2, col3 = st.columns(3)
with col1:
url_column = st.selectbox("URL column", headers, index=_safe_index(headers, ["url", "link"]))
with col2:
title_column = st.selectbox(
"Optional title override column",
["(none)", *headers],
index=_safe_index(["(none)", *headers], ["name", "title"]),
)
with col3:
post_type_column = st.selectbox(
"Optional post type column",
["(none)", *headers],
index=_safe_index(["(none)", *headers], ["post_type", "type"]),
disabled=post_type_mode != "Use a CSV column",
)
st.write(f"Loaded {len(rows)} row(s). Only the selected URL column will be scraped.")
if test_run:
st.caption("Test run is enabled. Only the first 10 rows with a URL will be scraped.")
if st.button("Scrape URLs", type="primary"):
context = build_scrape_context(
include_author=include_author,
include_categories=include_categories,
include_tags=include_tags,
force_heuristics=force_heuristics,
test_run=test_run,
post_type_mode=post_type_mode,
post_type_column=post_type_column,
default_post_type=default_post_type,
url_column=url_column,
title_column=title_column,
)
results = scrape_rows(rows, context, phase_label="Scraping")
st.session_state["results"] = results
st.session_state["input_rows"] = rows
st.session_state["scrape_context"] = context
results = st.session_state.get("results", [])
if not results:
return
successful = [post for post in results if post.success]
failed = [post for post in results if not post.success]
st.subheader("Results")
st.write(f"Successful: {len(successful)} | Failed: {len(failed)}")
if failed and st.button("Retry failed items"):
stored_rows = st.session_state.get("input_rows", rows)
context = st.session_state.get("scrape_context")
if context:
retried = scrape_rows(
stored_rows,
context,
row_numbers=[post.row_number for post in failed if post.row_number],
phase_label="Retrying",
)
results = merge_retry_results(results, retried)
st.session_state["results"] = results
successful = [post for post in results if post.success]
failed = [post for post in results if not post.success]
preview_rows = []
for post in results:
preview_rows.append(
{
"Row": post.row_number,
"URL": post.source_url,
"CMS": post.cms,
"Success": post.success,
"Title": post.title,
"Publish Date": post.publish_date,
"Author": post.author,
"Categories": ", ".join(post.categories),
"Tags": ", ".join(post.tags),
"Post Type": post.post_type,
"Error": post.error,
}
)
st.dataframe(
preview_rows,
width="stretch",
hide_index=True,
column_config={
"Row": st.column_config.NumberColumn(width="small"),
"URL": st.column_config.TextColumn(width="medium"),
"Title": st.column_config.TextColumn(width="medium"),
"Publish Date": st.column_config.TextColumn(width="medium"),
"Categories": st.column_config.TextColumn(width="medium"),
"Tags": st.column_config.TextColumn(width="medium"),
"Error": st.column_config.TextColumn(width="large"),
},
)
if failed:
selected_failed = st.selectbox(
"Failed row details",
failed,
format_func=lambda post: f"Row {post.row_number}: {post.source_url or '(missing URL)'}",
)
st.text_area(
"Error details",
value=selected_failed.error_details or selected_failed.error,
height=180,
disabled=True,
)
if successful:
selected_index = st.number_input(
"Preview successful row",
min_value=1,
max_value=len(successful),
value=1,
step=1,
)
selected = successful[selected_index - 1]
st.markdown("### Content Preview")
st.write(f"**Title:** {selected.title}")
st.write(f"**Source URL:** {selected.source_url}")
st.write(f"**Publish Date:** {selected.publish_date or '(missing)'}")
st.write(f"**Author:** {selected.author or '(missing)'}")
st.write(f"**Post Type:** {selected.post_type}")
st.write(selected.body_html, unsafe_allow_html=True)
render_export_sidebar(successful, rows, headers)
def build_scrape_context(
*,
include_author: bool,
include_categories: bool,
include_tags: bool,
force_heuristics: bool,
test_run: bool,
post_type_mode: str,
post_type_column: str,
default_post_type: str,
url_column: str,
title_column: str,
) -> dict[str, object]:
return {
"options": ScrapeOptions(
include_author=include_author,
include_categories=include_categories,
include_tags=include_tags,
force_heuristics=force_heuristics,
),
"test_run": test_run,
"post_type_mode": post_type_mode,
"post_type_column": post_type_column,
"default_post_type": default_post_type,
"url_column": url_column,
"title_column": title_column,
}
def scrape_rows(
rows: list[dict[str, str]],
context: dict[str, object],
row_numbers: list[int] | None = None,
phase_label: str = "Scraping",
) -> list[ScrapedPost]:
options = context["options"]
if not isinstance(options, ScrapeOptions):
raise TypeError("Invalid scrape options in session state.")
scraper = Scraper(options)
targets = list(enumerate(rows, start=1))
if row_numbers is not None:
requested_rows = set(row_numbers)
targets = [(row_number, row) for row_number, row in targets if row_number in requested_rows]
elif bool(context.get("test_run")):
targets = [
(row_number, row)
for row_number, row in targets
if (row.get(str(context["url_column"])) or "").strip()
][:10]
results: list[ScrapedPost] = []
progress = st.progress(0.0)
status = st.empty()
total = len(targets) or 1
for index, (row_number, row) in enumerate(targets, start=1):
url = (row.get(context["url_column"]) or "").strip()
status.write(f"{phase_label} {index}/{len(targets)}: {url or f'row {row_number} has no URL'}")
if url:
post = scraper.scrape(url)
else:
post = ScrapedPost(
source_url="",
row_number=row_number,
error="Missing URL in the selected URL column.",
error_details=f"Row {row_number} does not contain a URL in column '{context['url_column']}'.",
)
post.row_number = row_number
apply_row_overrides(post, row, context)
results.append(post)
progress.progress(index / total)
status.write(f"{phase_label} complete.")
return results
def apply_row_overrides(post: ScrapedPost, row: dict[str, str], context: dict[str, object]) -> None:
title_column = context["title_column"]
if isinstance(title_column, str) and title_column != "(none)" and row.get(title_column):
post.title = row[title_column].strip()
post.post_type = resolve_post_type(
row=row,
mode=str(context["post_type_mode"]),
column=str(context["post_type_column"]),
default_value=str(context["default_post_type"]),
)
def resolve_export_categories(
row: dict[str, str],
category_column: str,
manual_categories: list[str],
) -> list[str]:
csv_categories = parse_terms(row.get(category_column, "")) if category_column != "(none)" else []
return merge_unique_terms(csv_categories, manual_categories)
def parse_terms(value: str) -> list[str]:
return [term.strip() for term in re.split(r"[,|>]", value or "") if term.strip()]
def merge_unique_terms(*groups: list[str]) -> list[str]:
merged: list[str] = []
for group in groups:
for term in group:
cleaned = term.strip()
if cleaned and cleaned not in merged:
merged.append(cleaned)
return merged
def merge_retry_results(existing: list[ScrapedPost], replacements: list[ScrapedPost]) -> list[ScrapedPost]:
replacement_map = {post.row_number: post for post in replacements}
merged = [replacement_map.get(post.row_number, post) for post in existing]
return sorted(merged, key=lambda post: post.row_number or 0)
def build_export_posts(
posts: list[ScrapedPost],
rows: list[dict[str, str]],
category_column: str,
manual_categories: list[str],
post_status: str,
custom_post_type_slug: str,
) -> list[ScrapedPost]:
export_posts: list[ScrapedPost] = []
for post in posts:
row = rows[post.row_number - 1] if 0 < post.row_number <= len(rows) else {}
export_posts.append(
replace(
post,
status=post_status,
post_type=custom_post_type_slug or post.post_type,
categories=merge_unique_terms(
post.categories,
resolve_export_categories(row, category_column, manual_categories),
),
)
)
return export_posts
def render_export_sidebar(
successful: list[ScrapedPost],
rows: list[dict[str, str]],
headers: list[str],
) -> None:
with st.sidebar:
st.markdown("---")
st.subheader("Export")
post_status = st.selectbox(
"Imported post status",
["draft", "publish", "private"],
index=0,
key="export_post_status",
)
category_column = st.selectbox(
"CSV category column",
["(none)", *headers],
index=_safe_index(["(none)", *headers], ["category", "categories", "department"]),
key="export_category_column",
)
manual_categories = parse_terms(
st.text_input(
"Additional export categories",
value="",
help="Comma-separated categories to append to every exported item.",
key="export_manual_categories",
)
)
output_name = st.text_input(
"Output filename",
value="wordpress-import.xml",
key="export_output_name",
)
custom_post_type_slug = normalize_post_type_slug(
st.text_input(
"Custom post type slug",
value="",
help="Optional. If set, all exported items will use this WordPress post type slug.",
key="export_custom_post_type_slug",
)
)
export_posts = build_export_posts(
successful,
rows,
category_column,
manual_categories,
post_status,
custom_post_type_slug,
)
if custom_post_type_slug:
st.caption(f"Exporting all items as post type `{custom_post_type_slug}`.")
dated_export_posts = [(post, publish_date) for post in export_posts if (publish_date := parse_publish_date(post.publish_date))]
if dated_export_posts:
min_date = min(publish_date for _, publish_date in dated_export_posts)
max_date = max(publish_date for _, publish_date in dated_export_posts)
filter_by_publish_date = st.checkbox(
"Filter export by publish date",
value=False,
key="export_filter_by_publish_date",
)
if filter_by_publish_date:
export_start = st.date_input(
"Export start date",
value=min_date,
min_value=min_date,
max_value=max_date,
format="MM/DD/YYYY",
key="export_start_date",
)
export_end = st.date_input(
"Export end date",
value=max_date,
min_value=min_date,
max_value=max_date,
format="MM/DD/YYYY",
key="export_end_date",
)
if export_start > export_end:
st.error("Export start date must be on or before the end date.")
export_posts = []
else:
export_posts = [
post
for post in export_posts
if (publish_date := parse_publish_date(post.publish_date)) and export_start <= publish_date <= export_end
]
st.caption(
"Date filter: "
f"{export_start.strftime('%m/%d/%Y')} to {export_end.strftime('%m/%d/%Y')}."
)
undated_count = len(successful) - len(dated_export_posts)
if undated_count:
st.caption(f"Excluded {undated_count} successful item(s) with no publish date.")
else:
st.caption("No successful items have a publish date, so export date filtering is unavailable.")
st.caption(f"Ready to export {len(export_posts)} post(s).")
xml_data = build_wxr(export_posts)
st.download_button(
label="Download WXR XML",
data=xml_data,
file_name=output_name,
mime="application/xml",
disabled=not export_posts,
)
def parse_publish_date(value: str) -> dt.date | None:
parsed = parse_datetime(value)
if parsed is None:
return None
return parsed.date()
def _safe_index(values: list[str], candidates: list[str]) -> int:
lowered = {value.lower(): idx for idx, value in enumerate(values)}
for candidate in candidates:
if candidate in lowered:
return lowered[candidate]
return 0
def resolve_post_type(
row: dict[str, str],
mode: str,
column: str,
default_value: str,
) -> str:
if mode != "Use a CSV column" or column == "(none)":
return default_value
raw_value = normalize_post_type_slug(row.get(column) or "")
if raw_value:
return raw_value
return default_value
def normalize_post_type_slug(value: str) -> str:
return re.sub(r"[^a-z0-9_-]", "", (value or "").strip().lower())
if __name__ == "__main__":
st.set_page_config(page_title="Page Importer", layout="wide")
render_app()
+1
View File
@@ -0,0 +1 @@
+26
View File
@@ -0,0 +1,26 @@
from __future__ import annotations
import datetime as dt
from dateutil import parser as date_parser
def parse_datetime(value: str | None) -> dt.datetime | None:
if not value:
return None
try:
return date_parser.parse(value)
except (TypeError, ValueError, OverflowError):
try:
return date_parser.parse(value, fuzzy=True)
except (TypeError, ValueError, OverflowError):
return None
def normalize_date(value: str | None) -> str:
parsed = parse_datetime(value)
if parsed is None:
return ""
if parsed.tzinfo is None or parsed.utcoffset() is None:
return parsed.strftime("%Y-%m-%d %H:%M:%S")
return parsed.isoformat(sep=" ", timespec="seconds")
+34
View File
@@ -0,0 +1,34 @@
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class ScrapeOptions:
include_author: bool = True
include_categories: bool = True
include_tags: bool = True
force_heuristics: bool = False
request_timeout: int = 20
user_agent: str = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0 Safari/537.36"
)
@dataclass
class ScrapedPost:
source_url: str
row_number: int = 0
cms: str = "unknown"
title: str = ""
publish_date: str = ""
author: str = ""
body_html: str = ""
categories: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
status: str = "draft"
post_type: str = "post"
success: bool = False
error: str = ""
error_details: str = ""
+555
View File
@@ -0,0 +1,555 @@
from __future__ import annotations
import json
import re
import traceback
from html import unescape
from typing import Iterable
import requests
from bs4 import BeautifulSoup
from bs4.element import NavigableString, Tag
from page_importer.dates import normalize_date
from page_importer.models import ScrapeOptions, ScrapedPost
JSON_ARTICLE_TYPES = {
"article",
"blogposting",
"newsarticle",
"report",
"webpage",
}
BODY_SELECTORS = [
"article .entry-content",
"article .post-content",
"article .node__content",
"article .node .content",
"article .node-content",
"article .field-name-body .field-item",
"article .field-name-body",
"article .field--name-body",
"article .article-body",
"article .content",
".post-content",
".entry-content",
".node__content",
".node .content",
".node-content",
".field-name-body .field-item",
".field-name-body",
".field--name-body",
".article-body",
"#content-area .node .content",
"article",
"main article",
"main",
]
CATEGORY_SELECTORS = [
".cat-links a",
".post-categories a",
".field--name-field-category a",
".tags a[rel='category tag']",
".terms a",
".taxonomy a",
]
TAG_SELECTORS = [
".tags-links a",
".post-tags a",
".field--name-field-tags a",
"a[rel='tag']",
".terms a",
]
AUTHOR_SELECTORS = [
"[rel='author']",
".author a",
".byline a",
".submitted a",
".node__submitted a",
".node-info a",
".createdby",
]
DATE_SELECTORS = [
"time[datetime]",
"meta[property='article:published_time']",
"meta[name='publish_date']",
"meta[name='pubdate']",
".date-display-single",
".submitted",
".node-info",
]
DRUPAL_TITLE_DATE_PATTERN = re.compile(
r"(Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),\s+"
r"([A-Za-z]+)\s+\d{1,2},\s+\d{4}"
)
class Scraper:
def __init__(self, options: ScrapeOptions) -> None:
self.options = options
self.session = requests.Session()
self.session.headers.update({"User-Agent": options.user_agent})
def scrape(self, url: str) -> ScrapedPost:
post = ScrapedPost(source_url=url)
response: requests.Response | None = None
try:
response = self.session.get(url, timeout=self.options.request_timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
post.cms = detect_cms(soup)
article_data = extract_article_json_ld(soup)
if article_data and not self.options.force_heuristics:
apply_article_data(post, article_data, soup, self.options)
merge_fallback_data(post, soup, self.options)
post.body_html = sanitize_html(post.body_html)
missing_fields = [field for field, value in {"title": post.title, "body_html": post.body_html}.items() if not value]
if missing_fields:
raise ValueError(
"Unable to extract required field(s): "
f"{', '.join(missing_fields)}. "
f"Detected CMS: {post.cms}. "
f"Publish date found: {'yes' if post.publish_date else 'no'}. "
f"Author found: {'yes' if post.author else 'no'}."
)
post.success = True
return post
except Exception as exc:
post.error = format_error_summary(url, exc, response, self.options.request_timeout)
post.error_details = format_error_details(url, exc, response)
return post
def detect_cms(soup: BeautifulSoup) -> str:
generator = meta_content(soup, "meta", {"name": "generator"})
html = str(soup).lower()
if generator:
g = generator.lower()
if "wordpress" in g:
return "wordpress"
if "drupal" in g:
return "drupal"
if "joomla" in g:
return "joomla"
if "/wp-content/" in html:
return "wordpress"
if "drupal-settings-json" in html or "sites/default/files" in html:
return "drupal"
if "com_content" in html or "joomla" in html:
return "joomla"
return "unknown"
def extract_article_json_ld(soup: BeautifulSoup) -> dict | None:
for script in soup.select("script[type='application/ld+json']"):
raw = script.string or script.get_text(" ", strip=True)
if not raw:
continue
for payload in parse_json_candidates(raw):
article = find_article_payload(payload)
if article:
return article
return None
def parse_json_candidates(raw: str) -> Iterable[dict | list]:
try:
data = json.loads(raw)
yield data
return
except json.JSONDecodeError:
pass
cleaned = re.sub(r"[\x00-\x1f]+", " ", raw).strip()
try:
data = json.loads(cleaned)
yield data
except json.JSONDecodeError:
return
def find_article_payload(payload: dict | list) -> dict | None:
if isinstance(payload, list):
for item in payload:
found = find_article_payload(item)
if found:
return found
return None
if not isinstance(payload, dict):
return None
if "@graph" in payload:
found = find_article_payload(payload["@graph"])
if found:
return found
node_type = payload.get("@type")
types = {node_type.lower()} if isinstance(node_type, str) else {
item.lower() for item in node_type or [] if isinstance(item, str)
}
if types & JSON_ARTICLE_TYPES:
return payload
return None
def apply_article_data(
post: ScrapedPost,
article: dict,
soup: BeautifulSoup,
options: ScrapeOptions,
) -> None:
post.title = article.get("headline") or article.get("name") or post.title
post.publish_date = normalize_date(
article.get("datePublished") or article.get("dateCreated") or post.publish_date
)
if options.include_author:
post.author = extract_author_from_json_ld(article) or post.author
if options.include_categories:
post.categories = normalize_terms(article.get("articleSection")) or post.categories
if options.include_tags:
post.tags = normalize_terms(article.get("keywords")) or post.tags
post.body_html = extract_body_from_article(article, soup) or post.body_html
def merge_fallback_data(post: ScrapedPost, soup: BeautifulSoup, options: ScrapeOptions) -> None:
if not post.title:
post.title = extract_title(soup)
if not post.publish_date:
post.publish_date = extract_date(soup, post.cms)
if options.include_author and not post.author:
post.author = extract_author(soup)
if not post.body_html:
post.body_html = extract_body(soup)
if options.include_categories:
post.categories = merge_terms(post.categories, extract_terms(soup, CATEGORY_SELECTORS))
if post.cms == "drupal":
post.categories = merge_terms(post.categories, extract_drupal_department_categories(soup))
if options.include_tags and not post.tags:
post.tags = extract_terms(soup, TAG_SELECTORS)
def extract_title(soup: BeautifulSoup) -> str:
og_title = meta_content(soup, "meta", {"property": "og:title"})
if og_title:
return og_title
for selector in ("article h1", "h1.entry-title", "h1.page-title", "h1.title", "h1"):
node = soup.select_one(selector)
if node:
return clean_text(node.get_text(" ", strip=True))
return clean_text(soup.title.get_text(" ", strip=True)) if soup.title else ""
def extract_date(soup: BeautifulSoup, cms: str = "unknown") -> str:
for selector in DATE_SELECTORS:
node = soup.select_one(selector)
if not node:
continue
candidate = node.get("datetime") or node.get("content") or node.get_text(" ", strip=True)
normalized = normalize_date(candidate)
if normalized:
return normalized
if cms == "drupal":
return extract_drupal_title_adjacent_date(soup)
return ""
def extract_author(soup: BeautifulSoup) -> str:
author = meta_content(soup, "meta", {"name": "author"})
if author:
return clean_text(author)
for selector in AUTHOR_SELECTORS:
node = soup.select_one(selector)
if node:
return clean_text(node.get_text(" ", strip=True))
return ""
def extract_body(soup: BeautifulSoup) -> str:
fallback_html = ""
for selector in BODY_SELECTORS:
node = soup.select_one(selector)
if not node:
continue
candidate = clone_tag(node)
strip_unwanted(candidate)
html = candidate.decode_contents().strip()
text_length = len(BeautifulSoup(html, "html.parser").get_text(" ", strip=True))
if text_length >= 120:
return html
if not fallback_html and has_meaningful_body_content(html):
fallback_html = html
return fallback_html
def extract_terms(soup: BeautifulSoup, selectors: list[str]) -> list[str]:
terms: list[str] = []
for selector in selectors:
for node in soup.select(selector):
term = clean_text(node.get_text(" ", strip=True))
if term and term not in terms:
terms.append(term)
return terms
def extract_drupal_title_adjacent_date(soup: BeautifulSoup) -> str:
title_node = find_title_node(soup)
if not title_node:
return ""
for sibling in title_node.next_siblings:
candidate = text_from_node(sibling)
normalized = normalize_drupal_date(candidate)
if normalized:
return normalized
header = title_node.find_parent(["header", "div", "section"])
if header:
header_text = clean_text(header.get_text(" ", strip=True))
title_text = clean_text(title_node.get_text(" ", strip=True))
if title_text and header_text.startswith(title_text):
header_text = clean_text(header_text[len(title_text):])
normalized = normalize_drupal_date(header_text)
if normalized:
return normalized
return ""
def extract_drupal_department_categories(soup: BeautifulSoup) -> list[str]:
categories: list[str] = []
label_pattern = re.compile(r"^\s*Department:\s*$", re.IGNORECASE)
for label_node in soup.find_all(string=label_pattern):
parent = label_node.parent if isinstance(label_node.parent, Tag) else None
if not parent:
continue
inline_value = extract_labeled_value(parent.get_text(" ", strip=True), "Department")
normalized_inline_value = normalize_department_category(inline_value)
if normalized_inline_value:
categories = merge_terms(categories, [normalized_inline_value])
continue
for sibling in parent.next_siblings:
value = normalize_department_category(text_from_node(sibling))
if value:
categories = merge_terms(categories, [value])
break
for candidate in soup.find_all(["p", "li", "span", "dt", "dd"]):
text = clean_text(candidate.get_text(" ", strip=True))
if not text.lower().startswith("department:"):
continue
extracted = normalize_department_category(extract_labeled_value(text, "Department"))
if extracted:
categories = merge_terms(categories, [extracted])
return categories
def extract_author_from_json_ld(article: dict) -> str:
author = article.get("author")
if isinstance(author, dict):
return clean_text(author.get("name", ""))
if isinstance(author, list):
names = [clean_text(item.get("name", "")) for item in author if isinstance(item, dict)]
return ", ".join(name for name in names if name)
if isinstance(author, str):
return clean_text(author)
return ""
def extract_body_from_article(article: dict, soup: BeautifulSoup) -> str:
body = article.get("articleBody")
if isinstance(body, str) and len(body.strip()) > 120:
return f"<p>{unescape(body.strip())}</p>"
return extract_body(soup)
def normalize_terms(value: object) -> list[str]:
if isinstance(value, str):
parts = re.split(r"[,|>]", value)
return [clean_text(part) for part in parts if clean_text(part)]
if isinstance(value, list):
result: list[str] = []
for item in value:
if isinstance(item, str):
cleaned = clean_text(item)
if cleaned and cleaned not in result:
result.append(cleaned)
return result
return []
def merge_terms(*groups: list[str]) -> list[str]:
merged: list[str] = []
for group in groups:
for item in group:
cleaned = clean_text(item)
if cleaned and cleaned not in merged:
merged.append(cleaned)
return merged
def normalize_drupal_date(value: str | None) -> str:
if not value:
return ""
match = DRUPAL_TITLE_DATE_PATTERN.search(value)
if not match:
return ""
return normalize_date(match.group(0))
def meta_content(soup: BeautifulSoup, tag_name: str, attrs: dict[str, str]) -> str:
node = soup.find(tag_name, attrs=attrs)
if node and node.get("content"):
return node["content"].strip()
return ""
def clean_text(value: str) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def text_from_node(node: object) -> str:
if isinstance(node, NavigableString):
return clean_text(str(node))
if isinstance(node, Tag):
return clean_text(node.get_text(" ", strip=True))
return ""
def sanitize_html(html: str) -> str:
if not html:
return ""
soup = BeautifulSoup(html, "html.parser")
strip_unwanted(soup)
strip_dangerous_attributes(soup)
return soup.decode_contents().strip()
def has_meaningful_body_content(html: str) -> bool:
if not html:
return False
text = BeautifulSoup(html, "html.parser").get_text(" ", strip=True)
return bool(text) or any(token in html.lower() for token in ("<img", "<a ", "<embed", "<object"))
def strip_unwanted(node: BeautifulSoup | Tag) -> None:
for selector in ("script", "style", "noscript", "iframe", "form", "nav", ".share", ".social-share"):
for child in node.select(selector):
child.decompose()
def strip_dangerous_attributes(node: BeautifulSoup | Tag) -> None:
for child in node.find_all(True):
for attr_name in list(child.attrs):
normalized_name = attr_name.lower()
if normalized_name.startswith("on") or normalized_name == "srcdoc":
del child.attrs[attr_name]
continue
if normalized_name not in {"href", "src", "action", "formaction", "xlink:href"}:
continue
raw_value = child.attrs.get(attr_name)
if isinstance(raw_value, list):
candidate = " ".join(str(item) for item in raw_value)
else:
candidate = str(raw_value or "")
lowered = candidate.strip().lower()
if lowered.startswith(("javascript:", "vbscript:", "data:text/html")):
del child.attrs[attr_name]
def clone_tag(node: Tag) -> BeautifulSoup:
return BeautifulSoup(str(node), "html.parser")
def find_title_node(soup: BeautifulSoup) -> Tag | None:
for selector in ("article h1", "h1.entry-title", "h1.page-title", "h1.title", "h1"):
node = soup.select_one(selector)
if node:
return node
return None
def extract_labeled_value(text: str, label: str) -> str:
if not text:
return ""
pattern = re.compile(
rf"{re.escape(label)}:\s*(.+?)(?=\s+(?:[A-Z][a-z]+:)|\s{{2,}}|$)",
re.IGNORECASE,
)
match = pattern.search(clean_text(text))
if not match:
return ""
return clean_text(match.group(1))
def normalize_department_category(value: str) -> str:
cleaned = clean_text(value)
if not cleaned:
return ""
if len(cleaned) > 80 or len(cleaned.split()) > 8:
return ""
if any(token in cleaned.lower() for token in ("p.o. box", "contact us", "@", "http://", "https://")):
return ""
return cleaned
def format_error_summary(
url: str,
exc: Exception,
response: requests.Response | None,
timeout_seconds: int,
) -> str:
if isinstance(exc, requests.HTTPError):
failing_response = exc.response or response
if failing_response is not None:
return (
f"HTTP {failing_response.status_code} {failing_response.reason} "
f"while fetching {failing_response.url or url}"
)
if isinstance(exc, requests.Timeout):
return f"Request timed out after {timeout_seconds}s while fetching {url}"
if isinstance(exc, requests.RequestException):
return f"{type(exc).__name__} while fetching {url}: {exc}"
return f"{type(exc).__name__}: {exc}"
def format_error_details(
url: str,
exc: Exception,
response: requests.Response | None,
) -> str:
details = [
f"URL: {url}",
f"Error Type: {type(exc).__name__}",
f"Message: {exc}",
]
failing_response = getattr(exc, "response", None) or response
if failing_response is not None:
details.extend(
[
f"HTTP Status: {failing_response.status_code} {failing_response.reason}",
f"Resolved URL: {failing_response.url}",
]
)
trace = "".join(traceback.format_exception_only(type(exc), exc)).strip()
if trace:
details.append(f"Exception: {trace}")
return "\n".join(details)
+91
View File
@@ -0,0 +1,91 @@
from __future__ import annotations
from email.utils import format_datetime
from io import StringIO
from xml.sax.saxutils import escape
import datetime as dt
from page_importer.dates import parse_datetime
from page_importer.models import ScrapedPost
def build_wxr(posts: list[ScrapedPost], channel_title: str = "Imported Content") -> str:
now = dt.datetime.now(dt.timezone.utc)
out = StringIO()
out.write('<?xml version="1.0" encoding="UTF-8" ?>\n')
out.write(
'<rss version="2.0" xmlns:excerpt="http://wordpress.org/export/1.2/excerpt/" '
'xmlns:content="http://purl.org/rss/1.0/modules/content/" '
'xmlns:wfw="http://wellformedweb.org/CommentAPI/" '
'xmlns:dc="http://purl.org/dc/elements/1.1/" '
'xmlns:wp="http://wordpress.org/export/1.2/">\n'
)
out.write("<channel>\n")
out.write(f"<title>{escape(channel_title)}</title>\n")
out.write("<link>http://localhost/</link>\n")
out.write("<description>Generated by Page Importer</description>\n")
out.write(f"<pubDate>{format_datetime(now)}</pubDate>\n")
out.write("<language>en-US</language>\n")
out.write("<wp:wxr_version>1.2</wp:wxr_version>\n")
for post in posts:
local_date, gmt_date, item_pub_date = _resolve_post_dates(post.publish_date, now)
out.write("<item>\n")
out.write(f"<title>{escape(post.title)}</title>\n")
out.write(f"<link>{escape(post.source_url)}</link>\n")
out.write(f"<pubDate>{format_datetime(item_pub_date)}</pubDate>\n")
out.write(f"<dc:creator>{cdata(post.author or 'importer')}</dc:creator>\n")
out.write(f"<guid isPermaLink=\"false\">{escape(post.source_url)}</guid>\n")
out.write("<description></description>\n")
out.write(f"<content:encoded>{cdata(post.body_html)}</content:encoded>\n")
out.write(f"<excerpt:encoded>{cdata('')}</excerpt:encoded>\n")
out.write(f"<wp:post_date>{cdata(local_date)}</wp:post_date>\n")
out.write(f"<wp:post_date_gmt>{cdata(gmt_date)}</wp:post_date_gmt>\n")
out.write("<wp:comment_status><![CDATA[closed]]></wp:comment_status>\n")
out.write("<wp:ping_status><![CDATA[closed]]></wp:ping_status>\n")
out.write("<wp:post_name><![CDATA[]]></wp:post_name>\n")
out.write(f"<wp:status>{cdata(post.status)}</wp:status>\n")
out.write("<wp:post_parent>0</wp:post_parent>\n")
out.write("<wp:menu_order>0</wp:menu_order>\n")
out.write(f"<wp:post_type>{cdata(post.post_type or 'post')}</wp:post_type>\n")
out.write("<wp:post_password><![CDATA[]]></wp:post_password>\n")
out.write("<wp:is_sticky>0</wp:is_sticky>\n")
for category in post.categories:
out.write(
f'<category domain="category" nicename="{escape(slugify(category))}">{cdata(category)}</category>\n'
)
for tag in post.tags:
out.write(
f'<category domain="post_tag" nicename="{escape(slugify(tag))}">{cdata(tag)}</category>\n'
)
out.write("</item>\n")
out.write("</channel>\n</rss>\n")
return out.getvalue()
def slugify(value: str) -> str:
return "".join(ch.lower() if ch.isalnum() else "-" for ch in value).strip("-")
def cdata(value: str) -> str:
return f"<![CDATA[{(value or '').replace(']]>', ']]]]><![CDATA[>')}]]>"
def _resolve_post_dates(value: str, fallback: dt.datetime) -> tuple[str, str, dt.datetime]:
parsed = parse_datetime(value)
if parsed is None:
return "", "", fallback
if parsed.tzinfo is None or parsed.utcoffset() is None:
local_date = _format_wp_date(parsed)
assumed_utc = parsed.replace(tzinfo=dt.timezone.utc)
return local_date, local_date, assumed_utc
local_date = _format_wp_date(parsed)
gmt_value = parsed.astimezone(dt.timezone.utc)
return local_date, _format_wp_date(gmt_value), gmt_value
def _format_wp_date(value: dt.datetime) -> str:
return value.replace(tzinfo=None).strftime("%Y-%m-%d %H:%M:%S")
+4
View File
@@ -0,0 +1,4 @@
streamlit>=1.43,<2
requests>=2.32,<3
beautifulsoup4>=4.12,<5
python-dateutil>=2.9,<3
+79
View File
@@ -0,0 +1,79 @@
from __future__ import annotations
import unittest
from bs4 import BeautifulSoup
from page_importer.dates import normalize_date
from page_importer.scraper import CATEGORY_SELECTORS, TAG_SELECTORS, extract_terms, sanitize_html
from page_importer.wxr import build_wxr
from page_importer.models import ScrapedPost
class DateNormalizationTests(unittest.TestCase):
def test_preserves_timezone_offset_in_normalized_value(self) -> None:
self.assertEqual(
normalize_date("2024-05-01T09:30:00-07:00"),
"2024-05-01 09:30:00-07:00",
)
class WxrSerializationTests(unittest.TestCase):
def test_writes_local_and_gmt_dates_from_offset_timestamp(self) -> None:
xml = build_wxr(
[
ScrapedPost(
source_url="https://example.com/post",
title="Example",
body_html="<p>Body</p>",
publish_date="2024-05-01 09:30:00-07:00",
success=True,
)
]
)
self.assertIn("<wp:post_date><![CDATA[2024-05-01 09:30:00]]></wp:post_date>", xml)
self.assertIn("<wp:post_date_gmt><![CDATA[2024-05-01 16:30:00]]></wp:post_date_gmt>", xml)
self.assertIn("<pubDate>Wed, 01 May 2024 16:30:00 +0000</pubDate>", xml)
def test_splits_cdata_terminators_in_content(self) -> None:
xml = build_wxr(
[
ScrapedPost(
source_url="https://example.com/post",
title="Example",
body_html="<p>alpha ]]> omega</p>",
author="Jane ]]> Doe",
success=True,
)
]
)
self.assertIn("alpha ]]]]><![CDATA[> omega", xml)
self.assertIn("Jane ]]]]><![CDATA[> Doe", xml)
class HtmlSanitizationTests(unittest.TestCase):
def test_removes_inline_event_handlers_and_script_uris(self) -> None:
sanitized = sanitize_html(
'<div onclick="alert(1)"><a href="javascript:alert(1)">x</a><img src="x" onerror="alert(1)"></div>'
)
self.assertNotIn("onclick", sanitized)
self.assertNotIn("onerror", sanitized)
self.assertNotIn("javascript:", sanitized)
class TaxonomySelectorTests(unittest.TestCase):
def test_drupal_tag_field_is_not_treated_as_category(self) -> None:
soup = BeautifulSoup(
'<div class="field--name-field-tags"><a href="/tags/example">Example Tag</a></div>',
"html.parser",
)
self.assertEqual(extract_terms(soup, CATEGORY_SELECTORS), [])
self.assertEqual(extract_terms(soup, TAG_SELECTORS), ["Example Tag"])
if __name__ == "__main__":
unittest.main()