from __future__ import annotations import csv import datetime as dt import hashlib import io import re from dataclasses import replace import streamlit as st from page_importer.dates import parse_datetime from page_importer.models import ScrapeOptions, ScrapedPost from page_importer.scraper import Scraper from page_importer.wxr import build_wxr def load_csv(file_data: bytes) -> tuple[list[str], list[dict[str, str]]]: text = file_data.decode("utf-8-sig", errors="replace") reader = csv.DictReader(io.StringIO(text)) rows = list(reader) return reader.fieldnames or [], rows def build_upload_fingerprint(file_data: bytes) -> str: return hashlib.sha256(file_data).hexdigest() def sync_uploaded_file_state(session_state: dict[str, object], upload_fingerprint: str) -> None: previous_fingerprint = session_state.get("uploaded_csv_fingerprint") if previous_fingerprint == upload_fingerprint: return for key in ("results", "input_rows", "input_headers", "scrape_context"): session_state.pop(key, None) session_state["uploaded_csv_fingerprint"] = upload_fingerprint def render_app() -> None: st.title("Page Importer") st.caption("Scrape blog posts from CSV URLs and export a WordPress WXR file.") with st.sidebar: st.header("Options") include_author = st.checkbox("Include author", value=True) include_categories = st.checkbox("Include categories", value=True) include_tags = st.checkbox("Include tags", value=True) force_heuristics = st.checkbox("Force heuristic scraping", value=False) test_run = st.checkbox( "Test run only", value=False, help="Scrape only the first 10 rows that contain a URL.", ) post_type_mode = st.selectbox( "WordPress post type mode", ["Single type for all rows", "Use a CSV column"], index=0, ) default_post_type = st.selectbox("Default WordPress post type", ["post", "page"], index=0) uploaded = st.file_uploader("Upload CSV", type=["csv"]) if not uploaded: st.info("Upload a CSV to begin.") return uploaded_bytes = uploaded.getvalue() sync_uploaded_file_state(st.session_state, build_upload_fingerprint(uploaded_bytes)) headers, rows = load_csv(uploaded_bytes) if not rows: st.error("The CSV did not contain any rows.") return col1, col2, col3 = st.columns(3) with col1: url_column = st.selectbox("URL column", headers, index=_safe_index(headers, ["url", "link"])) with col2: title_column = st.selectbox( "Optional title override column", ["(none)", *headers], index=_safe_index(["(none)", *headers], ["name", "title"]), ) with col3: post_type_column = st.selectbox( "Optional post type column", ["(none)", *headers], index=_safe_index(["(none)", *headers], ["post_type", "type"]), disabled=post_type_mode != "Use a CSV column", ) st.write(f"Loaded {len(rows)} row(s). Only the selected URL column will be scraped.") if test_run: st.caption("Test run is enabled. Only the first 10 rows with a URL will be scraped.") if st.button("Scrape URLs", type="primary"): context = build_scrape_context( include_author=include_author, include_categories=include_categories, include_tags=include_tags, force_heuristics=force_heuristics, test_run=test_run, post_type_mode=post_type_mode, post_type_column=post_type_column, default_post_type=default_post_type, url_column=url_column, title_column=title_column, ) results = scrape_rows(rows, context, phase_label="Scraping") st.session_state["results"] = results st.session_state["input_rows"] = rows st.session_state["input_headers"] = headers st.session_state["scrape_context"] = context results = st.session_state.get("results", []) if not results: return successful = [post for post in results if post.success] failed = [post for post in results if not post.success] st.subheader("Results") st.write(f"Successful: {len(successful)} | Failed: {len(failed)}") if failed and st.button("Retry failed items"): stored_rows = st.session_state.get("input_rows", rows) context = st.session_state.get("scrape_context") if context: retried = scrape_rows( stored_rows, context, row_numbers=[post.row_number for post in failed if post.row_number], phase_label="Retrying", ) results = merge_retry_results(results, retried) st.session_state["results"] = results successful = [post for post in results if post.success] failed = [post for post in results if not post.success] preview_rows = [] for post in results: preview_rows.append( { "Row": post.row_number, "URL": post.source_url, "CMS": post.cms, "Success": post.success, "Title": post.title, "Publish Date": post.publish_date, "Author": post.author, "Categories": ", ".join(post.categories), "Tags": ", ".join(post.tags), "Post Type": post.post_type, "Error": post.error, } ) st.dataframe( preview_rows, width="stretch", hide_index=True, column_config={ "Row": st.column_config.NumberColumn(width="small"), "URL": st.column_config.TextColumn(width="medium"), "Title": st.column_config.TextColumn(width="medium"), "Publish Date": st.column_config.TextColumn(width="medium"), "Categories": st.column_config.TextColumn(width="medium"), "Tags": st.column_config.TextColumn(width="medium"), "Error": st.column_config.TextColumn(width="large"), }, ) if failed: selected_failed = st.selectbox( "Failed row details", failed, format_func=lambda post: f"Row {post.row_number}: {post.source_url or '(missing URL)'}", ) st.text_area( "Error details", value=selected_failed.error_details or selected_failed.error, height=180, disabled=True, ) if successful: selected_index = st.number_input( "Preview successful row", min_value=1, max_value=len(successful), value=1, step=1, ) selected = successful[selected_index - 1] st.markdown("### Content Preview") st.write(f"**Title:** {selected.title}") st.write(f"**Source URL:** {selected.source_url}") st.write(f"**Publish Date:** {selected.publish_date or '(missing)'}") st.write(f"**Author:** {selected.author or '(missing)'}") st.write(f"**Post Type:** {selected.post_type}") st.write(selected.body_html, unsafe_allow_html=True) stored_rows = st.session_state.get("input_rows", rows) stored_headers = st.session_state.get("input_headers", headers) render_export_sidebar(successful, stored_rows, stored_headers) def build_scrape_context( *, include_author: bool, include_categories: bool, include_tags: bool, force_heuristics: bool, test_run: bool, post_type_mode: str, post_type_column: str, default_post_type: str, url_column: str, title_column: str, ) -> dict[str, object]: return { "options": ScrapeOptions( include_author=include_author, include_categories=include_categories, include_tags=include_tags, force_heuristics=force_heuristics, ), "test_run": test_run, "post_type_mode": post_type_mode, "post_type_column": post_type_column, "default_post_type": default_post_type, "url_column": url_column, "title_column": title_column, } def scrape_rows( rows: list[dict[str, str]], context: dict[str, object], row_numbers: list[int] | None = None, phase_label: str = "Scraping", ) -> list[ScrapedPost]: options = context["options"] if not isinstance(options, ScrapeOptions): raise TypeError("Invalid scrape options in session state.") scraper = Scraper(options) targets = list(enumerate(rows, start=1)) if row_numbers is not None: requested_rows = set(row_numbers) targets = [(row_number, row) for row_number, row in targets if row_number in requested_rows] elif bool(context.get("test_run")): targets = [ (row_number, row) for row_number, row in targets if (row.get(str(context["url_column"])) or "").strip() ][:10] results: list[ScrapedPost] = [] progress = st.progress(0.0) status = st.empty() total = len(targets) or 1 for index, (row_number, row) in enumerate(targets, start=1): url = (row.get(context["url_column"]) or "").strip() status.write(f"{phase_label} {index}/{len(targets)}: {url or f'row {row_number} has no URL'}") if url: post = scraper.scrape(url) else: post = ScrapedPost( source_url="", row_number=row_number, error="Missing URL in the selected URL column.", error_details=f"Row {row_number} does not contain a URL in column '{context['url_column']}'.", ) post.row_number = row_number apply_row_overrides(post, row, context) results.append(post) progress.progress(index / total) status.write(f"{phase_label} complete.") return results def apply_row_overrides(post: ScrapedPost, row: dict[str, str], context: dict[str, object]) -> None: title_column = context["title_column"] if isinstance(title_column, str) and title_column != "(none)" and row.get(title_column): post.title = row[title_column].strip() post.post_type = resolve_post_type( row=row, mode=str(context["post_type_mode"]), column=str(context["post_type_column"]), default_value=str(context["default_post_type"]), ) def resolve_export_categories( row: dict[str, str], category_column: str, manual_categories: list[str], ) -> list[str]: csv_categories = parse_terms(row.get(category_column, "")) if category_column != "(none)" else [] return merge_unique_terms(csv_categories, manual_categories) def parse_terms(value: str) -> list[str]: return [term.strip() for term in re.split(r"[,|>]", value or "") if term.strip()] def merge_unique_terms(*groups: list[str]) -> list[str]: merged: list[str] = [] for group in groups: for term in group: cleaned = term.strip() if cleaned and cleaned not in merged: merged.append(cleaned) return merged def merge_retry_results(existing: list[ScrapedPost], replacements: list[ScrapedPost]) -> list[ScrapedPost]: replacement_map = {post.row_number: post for post in replacements} merged = [replacement_map.get(post.row_number, post) for post in existing] return sorted(merged, key=lambda post: post.row_number or 0) def build_export_posts( posts: list[ScrapedPost], rows: list[dict[str, str]], category_column: str, manual_categories: list[str], post_status: str, custom_post_type_slug: str, ) -> list[ScrapedPost]: export_posts: list[ScrapedPost] = [] for post in posts: row = rows[post.row_number - 1] if 0 < post.row_number <= len(rows) else {} export_posts.append( replace( post, status=post_status, post_type=custom_post_type_slug or post.post_type, categories=merge_unique_terms( post.categories, resolve_export_categories(row, category_column, manual_categories), ), ) ) return export_posts def render_export_sidebar( successful: list[ScrapedPost], rows: list[dict[str, str]], headers: list[str], ) -> None: with st.sidebar: st.markdown("---") st.subheader("Export") post_status = st.selectbox( "Imported post status", ["draft", "publish", "private"], index=0, key="export_post_status", ) category_column = st.selectbox( "CSV category column", ["(none)", *headers], index=_safe_index(["(none)", *headers], ["category", "categories", "department"]), key="export_category_column", ) manual_categories = parse_terms( st.text_input( "Additional export categories", value="", help="Comma-separated categories to append to every exported item.", key="export_manual_categories", ) ) output_name = st.text_input( "Output filename", value="wordpress-import.xml", key="export_output_name", ) custom_post_type_slug = normalize_post_type_slug( st.text_input( "Custom post type slug", value="", help="Optional. If set, all exported items will use this WordPress post type slug.", key="export_custom_post_type_slug", ) ) export_posts = build_export_posts( successful, rows, category_column, manual_categories, post_status, custom_post_type_slug, ) if custom_post_type_slug: st.caption(f"Exporting all items as post type `{custom_post_type_slug}`.") dated_export_posts = [(post, publish_date) for post in export_posts if (publish_date := parse_publish_date(post.publish_date))] if dated_export_posts: min_date = min(publish_date for _, publish_date in dated_export_posts) max_date = max(publish_date for _, publish_date in dated_export_posts) filter_by_publish_date = st.checkbox( "Filter export by publish date", value=False, key="export_filter_by_publish_date", ) if filter_by_publish_date: export_start = st.date_input( "Export start date", value=min_date, min_value=min_date, max_value=max_date, format="MM/DD/YYYY", key="export_start_date", ) export_end = st.date_input( "Export end date", value=max_date, min_value=min_date, max_value=max_date, format="MM/DD/YYYY", key="export_end_date", ) if export_start > export_end: st.error("Export start date must be on or before the end date.") export_posts = [] else: export_posts = [ post for post in export_posts if (publish_date := parse_publish_date(post.publish_date)) and export_start <= publish_date <= export_end ] st.caption( "Date filter: " f"{export_start.strftime('%m/%d/%Y')} to {export_end.strftime('%m/%d/%Y')}." ) undated_count = len(successful) - len(dated_export_posts) if undated_count: st.caption(f"Excluded {undated_count} successful item(s) with no publish date.") else: st.caption("No successful items have a publish date, so export date filtering is unavailable.") st.caption(f"Ready to export {len(export_posts)} post(s).") xml_data = build_wxr(export_posts) st.download_button( label="Download WXR XML", data=xml_data, file_name=output_name, mime="application/xml", disabled=not export_posts, ) def parse_publish_date(value: str) -> dt.date | None: parsed = parse_datetime(value) if parsed is None: return None return parsed.date() def _safe_index(values: list[str], candidates: list[str]) -> int: lowered = {value.lower(): idx for idx, value in enumerate(values)} for candidate in candidates: if candidate in lowered: return lowered[candidate] return 0 def resolve_post_type( row: dict[str, str], mode: str, column: str, default_value: str, ) -> str: if mode != "Use a CSV column" or column == "(none)": return default_value raw_value = normalize_post_type_slug(row.get(column) or "") if raw_value: return raw_value return default_value def normalize_post_type_slug(value: str) -> str: return re.sub(r"[^a-z0-9_-]", "", (value or "").strip().lower()) if __name__ == "__main__": st.set_page_config(page_title="Page Importer", layout="wide") render_app()