from __future__ import annotations import json import re import traceback from html import unescape from typing import Iterable import requests from bs4 import BeautifulSoup from bs4.element import NavigableString, Tag from page_importer.dates import normalize_date from page_importer.models import ScrapeOptions, ScrapedPost JSON_ARTICLE_TYPES = { "article", "blogposting", "newsarticle", "report", "webpage", } BODY_SELECTORS = [ "article .entry-content", "article .post-content", "article .node__content", "article .node .content", "article .node-content", "article .field-name-body .field-item", "article .field-name-body", "article .field--name-body", "article .article-body", "article .content", ".post-content", ".entry-content", ".node__content", ".node .content", ".node-content", ".field-name-body .field-item", ".field-name-body", ".field--name-body", ".article-body", "#content-area .node .content", "article", "main article", "main", ] CATEGORY_SELECTORS = [ ".cat-links a", ".post-categories a", ".field--name-field-category a", ".tags a[rel='category tag']", ".terms a", ".taxonomy a", ] TAG_SELECTORS = [ ".tags-links a", ".post-tags a", ".field--name-field-tags a", "a[rel='tag']", ".terms a", ] AUTHOR_SELECTORS = [ "[rel='author']", ".author a", ".byline a", ".submitted a", ".node__submitted a", ".node-info a", ".createdby", ] DATE_SELECTORS = [ "time[datetime]", "meta[property='article:published_time']", "meta[name='publish_date']", "meta[name='pubdate']", ".date-display-single", ".submitted", ".node-info", ] DRUPAL_TITLE_DATE_PATTERN = re.compile( r"(Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),\s+" r"([A-Za-z]+)\s+\d{1,2},\s+\d{4}" ) class Scraper: def __init__(self, options: ScrapeOptions) -> None: self.options = options self.session = requests.Session() self.session.headers.update({"User-Agent": options.user_agent}) def scrape(self, url: str) -> ScrapedPost: post = ScrapedPost(source_url=url) response: requests.Response | None = None try: response = self.session.get(url, timeout=self.options.request_timeout) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") post.cms = detect_cms(soup) article_data = extract_article_json_ld(soup) if article_data and not self.options.force_heuristics: apply_article_data(post, article_data, soup, self.options) merge_fallback_data(post, soup, self.options) post.body_html = sanitize_html(post.body_html) missing_fields = [field for field, value in {"title": post.title, "body_html": post.body_html}.items() if not value] if missing_fields: raise ValueError( "Unable to extract required field(s): " f"{', '.join(missing_fields)}. " f"Detected CMS: {post.cms}. " f"Publish date found: {'yes' if post.publish_date else 'no'}. " f"Author found: {'yes' if post.author else 'no'}." ) post.success = True return post except Exception as exc: post.error = format_error_summary(url, exc, response, self.options.request_timeout) post.error_details = format_error_details(url, exc, response) return post def detect_cms(soup: BeautifulSoup) -> str: generator = meta_content(soup, "meta", {"name": "generator"}) html = str(soup).lower() if generator: g = generator.lower() if "wordpress" in g: return "wordpress" if "drupal" in g: return "drupal" if "joomla" in g: return "joomla" if "/wp-content/" in html: return "wordpress" if "drupal-settings-json" in html or "sites/default/files" in html: return "drupal" if "com_content" in html or "joomla" in html: return "joomla" return "unknown" def extract_article_json_ld(soup: BeautifulSoup) -> dict | None: for script in soup.select("script[type='application/ld+json']"): raw = script.string or script.get_text(" ", strip=True) if not raw: continue for payload in parse_json_candidates(raw): article = find_article_payload(payload) if article: return article return None def parse_json_candidates(raw: str) -> Iterable[dict | list]: try: data = json.loads(raw) yield data return except json.JSONDecodeError: pass cleaned = re.sub(r"[\x00-\x1f]+", " ", raw).strip() try: data = json.loads(cleaned) yield data except json.JSONDecodeError: return def find_article_payload(payload: dict | list) -> dict | None: if isinstance(payload, list): for item in payload: found = find_article_payload(item) if found: return found return None if not isinstance(payload, dict): return None if "@graph" in payload: found = find_article_payload(payload["@graph"]) if found: return found node_type = payload.get("@type") types = {node_type.lower()} if isinstance(node_type, str) else { item.lower() for item in node_type or [] if isinstance(item, str) } if types & JSON_ARTICLE_TYPES: return payload return None def apply_article_data( post: ScrapedPost, article: dict, soup: BeautifulSoup, options: ScrapeOptions, ) -> None: post.title = article.get("headline") or article.get("name") or post.title post.publish_date = normalize_date( article.get("datePublished") or article.get("dateCreated") or post.publish_date ) if options.include_author: post.author = extract_author_from_json_ld(article) or post.author if options.include_categories: post.categories = normalize_terms(article.get("articleSection")) or post.categories if options.include_tags: post.tags = normalize_terms(article.get("keywords")) or post.tags post.body_html = extract_body_from_article(article, soup) or post.body_html def merge_fallback_data(post: ScrapedPost, soup: BeautifulSoup, options: ScrapeOptions) -> None: if not post.title: post.title = extract_title(soup) if not post.publish_date: post.publish_date = extract_date(soup, post.cms) if options.include_author and not post.author: post.author = extract_author(soup) if not post.body_html: post.body_html = extract_body(soup) if options.include_categories: post.categories = merge_terms(post.categories, extract_terms(soup, CATEGORY_SELECTORS)) if post.cms == "drupal": post.categories = merge_terms(post.categories, extract_drupal_department_categories(soup)) if options.include_tags and not post.tags: post.tags = extract_terms(soup, TAG_SELECTORS) def extract_title(soup: BeautifulSoup) -> str: og_title = meta_content(soup, "meta", {"property": "og:title"}) if og_title: return og_title for selector in ("article h1", "h1.entry-title", "h1.page-title", "h1.title", "h1"): node = soup.select_one(selector) if node: return clean_text(node.get_text(" ", strip=True)) return clean_text(soup.title.get_text(" ", strip=True)) if soup.title else "" def extract_date(soup: BeautifulSoup, cms: str = "unknown") -> str: for selector in DATE_SELECTORS: node = soup.select_one(selector) if not node: continue candidate = node.get("datetime") or node.get("content") or node.get_text(" ", strip=True) normalized = normalize_date(candidate) if normalized: return normalized if cms == "drupal": return extract_drupal_title_adjacent_date(soup) return "" def extract_author(soup: BeautifulSoup) -> str: author = meta_content(soup, "meta", {"name": "author"}) if author: return clean_text(author) for selector in AUTHOR_SELECTORS: node = soup.select_one(selector) if node: return clean_text(node.get_text(" ", strip=True)) return "" def extract_body(soup: BeautifulSoup) -> str: fallback_html = "" for selector in BODY_SELECTORS: node = soup.select_one(selector) if not node: continue candidate = clone_tag(node) strip_unwanted(candidate) html = candidate.decode_contents().strip() text_length = len(BeautifulSoup(html, "html.parser").get_text(" ", strip=True)) if text_length >= 120: return html if not fallback_html and has_meaningful_body_content(html): fallback_html = html return fallback_html def extract_terms(soup: BeautifulSoup, selectors: list[str]) -> list[str]: terms: list[str] = [] for selector in selectors: for node in soup.select(selector): term = clean_text(node.get_text(" ", strip=True)) if term and term not in terms: terms.append(term) return terms def extract_drupal_title_adjacent_date(soup: BeautifulSoup) -> str: title_node = find_title_node(soup) if not title_node: return "" for sibling in title_node.next_siblings: candidate = text_from_node(sibling) normalized = normalize_drupal_date(candidate) if normalized: return normalized header = title_node.find_parent(["header", "div", "section"]) if header: header_text = clean_text(header.get_text(" ", strip=True)) title_text = clean_text(title_node.get_text(" ", strip=True)) if title_text and header_text.startswith(title_text): header_text = clean_text(header_text[len(title_text):]) normalized = normalize_drupal_date(header_text) if normalized: return normalized return "" def extract_drupal_department_categories(soup: BeautifulSoup) -> list[str]: categories: list[str] = [] label_pattern = re.compile(r"^\s*Department:\s*$", re.IGNORECASE) for label_node in soup.find_all(string=label_pattern): parent = label_node.parent if isinstance(label_node.parent, Tag) else None if not parent: continue inline_value = extract_labeled_value(parent.get_text(" ", strip=True), "Department") normalized_inline_value = normalize_department_category(inline_value) if normalized_inline_value: categories = merge_terms(categories, [normalized_inline_value]) continue for sibling in parent.next_siblings: value = normalize_department_category(text_from_node(sibling)) if value: categories = merge_terms(categories, [value]) break for candidate in soup.find_all(["p", "li", "span", "dt", "dd"]): text = clean_text(candidate.get_text(" ", strip=True)) if not text.lower().startswith("department:"): continue extracted = normalize_department_category(extract_labeled_value(text, "Department")) if extracted: categories = merge_terms(categories, [extracted]) return categories def extract_author_from_json_ld(article: dict) -> str: author = article.get("author") if isinstance(author, dict): return clean_text(author.get("name", "")) if isinstance(author, list): names = [clean_text(item.get("name", "")) for item in author if isinstance(item, dict)] return ", ".join(name for name in names if name) if isinstance(author, str): return clean_text(author) return "" def extract_body_from_article(article: dict, soup: BeautifulSoup) -> str: body = article.get("articleBody") if isinstance(body, str) and len(body.strip()) > 120: return f"
{unescape(body.strip())}
" return extract_body(soup) def normalize_terms(value: object) -> list[str]: if isinstance(value, str): parts = re.split(r"[,|>]", value) return [clean_text(part) for part in parts if clean_text(part)] if isinstance(value, list): result: list[str] = [] for item in value: if isinstance(item, str): cleaned = clean_text(item) if cleaned and cleaned not in result: result.append(cleaned) return result return [] def merge_terms(*groups: list[str]) -> list[str]: merged: list[str] = [] for group in groups: for item in group: cleaned = clean_text(item) if cleaned and cleaned not in merged: merged.append(cleaned) return merged def normalize_drupal_date(value: str | None) -> str: if not value: return "" match = DRUPAL_TITLE_DATE_PATTERN.search(value) if not match: return "" return normalize_date(match.group(0)) def meta_content(soup: BeautifulSoup, tag_name: str, attrs: dict[str, str]) -> str: node = soup.find(tag_name, attrs=attrs) if node and node.get("content"): return node["content"].strip() return "" def clean_text(value: str) -> str: return re.sub(r"\s+", " ", value or "").strip() def text_from_node(node: object) -> str: if isinstance(node, NavigableString): return clean_text(str(node)) if isinstance(node, Tag): return clean_text(node.get_text(" ", strip=True)) return "" def sanitize_html(html: str) -> str: if not html: return "" soup = BeautifulSoup(html, "html.parser") strip_unwanted(soup) strip_dangerous_attributes(soup) return soup.decode_contents().strip() def has_meaningful_body_content(html: str) -> bool: if not html: return False text = BeautifulSoup(html, "html.parser").get_text(" ", strip=True) return bool(text) or any(token in html.lower() for token in ("