556 lines
17 KiB
Python
556 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import traceback
|
|
from html import unescape
|
|
from typing import Iterable
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from bs4.element import NavigableString, Tag
|
|
|
|
from page_importer.dates import normalize_date
|
|
from page_importer.models import ScrapeOptions, ScrapedPost
|
|
|
|
JSON_ARTICLE_TYPES = {
|
|
"article",
|
|
"blogposting",
|
|
"newsarticle",
|
|
"report",
|
|
"webpage",
|
|
}
|
|
|
|
BODY_SELECTORS = [
|
|
"article .entry-content",
|
|
"article .post-content",
|
|
"article .node__content",
|
|
"article .node .content",
|
|
"article .node-content",
|
|
"article .field-name-body .field-item",
|
|
"article .field-name-body",
|
|
"article .field--name-body",
|
|
"article .article-body",
|
|
"article .content",
|
|
".post-content",
|
|
".entry-content",
|
|
".node__content",
|
|
".node .content",
|
|
".node-content",
|
|
".field-name-body .field-item",
|
|
".field-name-body",
|
|
".field--name-body",
|
|
".article-body",
|
|
"#content-area .node .content",
|
|
"article",
|
|
"main article",
|
|
"main",
|
|
]
|
|
|
|
CATEGORY_SELECTORS = [
|
|
".cat-links a",
|
|
".post-categories a",
|
|
".field--name-field-category a",
|
|
".tags a[rel='category tag']",
|
|
".terms a",
|
|
".taxonomy a",
|
|
]
|
|
|
|
TAG_SELECTORS = [
|
|
".tags-links a",
|
|
".post-tags a",
|
|
".field--name-field-tags a",
|
|
"a[rel='tag']",
|
|
".terms a",
|
|
]
|
|
|
|
AUTHOR_SELECTORS = [
|
|
"[rel='author']",
|
|
".author a",
|
|
".byline a",
|
|
".submitted a",
|
|
".node__submitted a",
|
|
".node-info a",
|
|
".createdby",
|
|
]
|
|
|
|
DATE_SELECTORS = [
|
|
"time[datetime]",
|
|
"meta[property='article:published_time']",
|
|
"meta[name='publish_date']",
|
|
"meta[name='pubdate']",
|
|
".date-display-single",
|
|
".submitted",
|
|
".node-info",
|
|
]
|
|
|
|
DRUPAL_TITLE_DATE_PATTERN = re.compile(
|
|
r"(Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),\s+"
|
|
r"([A-Za-z]+)\s+\d{1,2},\s+\d{4}"
|
|
)
|
|
|
|
|
|
class Scraper:
|
|
def __init__(self, options: ScrapeOptions) -> None:
|
|
self.options = options
|
|
self.session = requests.Session()
|
|
self.session.headers.update({"User-Agent": options.user_agent})
|
|
|
|
def scrape(self, url: str) -> ScrapedPost:
|
|
post = ScrapedPost(source_url=url)
|
|
response: requests.Response | None = None
|
|
try:
|
|
response = self.session.get(url, timeout=self.options.request_timeout)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
post.cms = detect_cms(soup)
|
|
|
|
article_data = extract_article_json_ld(soup)
|
|
if article_data and not self.options.force_heuristics:
|
|
apply_article_data(post, article_data, soup, self.options)
|
|
|
|
merge_fallback_data(post, soup, self.options)
|
|
post.body_html = sanitize_html(post.body_html)
|
|
|
|
missing_fields = [field for field, value in {"title": post.title, "body_html": post.body_html}.items() if not value]
|
|
if missing_fields:
|
|
raise ValueError(
|
|
"Unable to extract required field(s): "
|
|
f"{', '.join(missing_fields)}. "
|
|
f"Detected CMS: {post.cms}. "
|
|
f"Publish date found: {'yes' if post.publish_date else 'no'}. "
|
|
f"Author found: {'yes' if post.author else 'no'}."
|
|
)
|
|
|
|
post.success = True
|
|
return post
|
|
except Exception as exc:
|
|
post.error = format_error_summary(url, exc, response, self.options.request_timeout)
|
|
post.error_details = format_error_details(url, exc, response)
|
|
return post
|
|
|
|
|
|
def detect_cms(soup: BeautifulSoup) -> str:
|
|
generator = meta_content(soup, "meta", {"name": "generator"})
|
|
html = str(soup).lower()
|
|
if generator:
|
|
g = generator.lower()
|
|
if "wordpress" in g:
|
|
return "wordpress"
|
|
if "drupal" in g:
|
|
return "drupal"
|
|
if "joomla" in g:
|
|
return "joomla"
|
|
if "/wp-content/" in html:
|
|
return "wordpress"
|
|
if "drupal-settings-json" in html or "sites/default/files" in html:
|
|
return "drupal"
|
|
if "com_content" in html or "joomla" in html:
|
|
return "joomla"
|
|
return "unknown"
|
|
|
|
|
|
def extract_article_json_ld(soup: BeautifulSoup) -> dict | None:
|
|
for script in soup.select("script[type='application/ld+json']"):
|
|
raw = script.string or script.get_text(" ", strip=True)
|
|
if not raw:
|
|
continue
|
|
for payload in parse_json_candidates(raw):
|
|
article = find_article_payload(payload)
|
|
if article:
|
|
return article
|
|
return None
|
|
|
|
|
|
def parse_json_candidates(raw: str) -> Iterable[dict | list]:
|
|
try:
|
|
data = json.loads(raw)
|
|
yield data
|
|
return
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
cleaned = re.sub(r"[\x00-\x1f]+", " ", raw).strip()
|
|
try:
|
|
data = json.loads(cleaned)
|
|
yield data
|
|
except json.JSONDecodeError:
|
|
return
|
|
|
|
|
|
def find_article_payload(payload: dict | list) -> dict | None:
|
|
if isinstance(payload, list):
|
|
for item in payload:
|
|
found = find_article_payload(item)
|
|
if found:
|
|
return found
|
|
return None
|
|
if not isinstance(payload, dict):
|
|
return None
|
|
if "@graph" in payload:
|
|
found = find_article_payload(payload["@graph"])
|
|
if found:
|
|
return found
|
|
node_type = payload.get("@type")
|
|
types = {node_type.lower()} if isinstance(node_type, str) else {
|
|
item.lower() for item in node_type or [] if isinstance(item, str)
|
|
}
|
|
if types & JSON_ARTICLE_TYPES:
|
|
return payload
|
|
return None
|
|
|
|
|
|
def apply_article_data(
|
|
post: ScrapedPost,
|
|
article: dict,
|
|
soup: BeautifulSoup,
|
|
options: ScrapeOptions,
|
|
) -> None:
|
|
post.title = article.get("headline") or article.get("name") or post.title
|
|
post.publish_date = normalize_date(
|
|
article.get("datePublished") or article.get("dateCreated") or post.publish_date
|
|
)
|
|
if options.include_author:
|
|
post.author = extract_author_from_json_ld(article) or post.author
|
|
if options.include_categories:
|
|
post.categories = normalize_terms(article.get("articleSection")) or post.categories
|
|
if options.include_tags:
|
|
post.tags = normalize_terms(article.get("keywords")) or post.tags
|
|
post.body_html = extract_body_from_article(article, soup) or post.body_html
|
|
|
|
|
|
def merge_fallback_data(post: ScrapedPost, soup: BeautifulSoup, options: ScrapeOptions) -> None:
|
|
if not post.title:
|
|
post.title = extract_title(soup)
|
|
if not post.publish_date:
|
|
post.publish_date = extract_date(soup, post.cms)
|
|
if options.include_author and not post.author:
|
|
post.author = extract_author(soup)
|
|
if not post.body_html:
|
|
post.body_html = extract_body(soup)
|
|
if options.include_categories:
|
|
post.categories = merge_terms(post.categories, extract_terms(soup, CATEGORY_SELECTORS))
|
|
if post.cms == "drupal":
|
|
post.categories = merge_terms(post.categories, extract_drupal_department_categories(soup))
|
|
if options.include_tags and not post.tags:
|
|
post.tags = extract_terms(soup, TAG_SELECTORS)
|
|
|
|
|
|
def extract_title(soup: BeautifulSoup) -> str:
|
|
og_title = meta_content(soup, "meta", {"property": "og:title"})
|
|
if og_title:
|
|
return og_title
|
|
for selector in ("article h1", "h1.entry-title", "h1.page-title", "h1.title", "h1"):
|
|
node = soup.select_one(selector)
|
|
if node:
|
|
return clean_text(node.get_text(" ", strip=True))
|
|
return clean_text(soup.title.get_text(" ", strip=True)) if soup.title else ""
|
|
|
|
|
|
def extract_date(soup: BeautifulSoup, cms: str = "unknown") -> str:
|
|
for selector in DATE_SELECTORS:
|
|
node = soup.select_one(selector)
|
|
if not node:
|
|
continue
|
|
candidate = node.get("datetime") or node.get("content") or node.get_text(" ", strip=True)
|
|
normalized = normalize_date(candidate)
|
|
if normalized:
|
|
return normalized
|
|
if cms == "drupal":
|
|
return extract_drupal_title_adjacent_date(soup)
|
|
return ""
|
|
|
|
|
|
def extract_author(soup: BeautifulSoup) -> str:
|
|
author = meta_content(soup, "meta", {"name": "author"})
|
|
if author:
|
|
return clean_text(author)
|
|
for selector in AUTHOR_SELECTORS:
|
|
node = soup.select_one(selector)
|
|
if node:
|
|
return clean_text(node.get_text(" ", strip=True))
|
|
return ""
|
|
|
|
|
|
def extract_body(soup: BeautifulSoup) -> str:
|
|
fallback_html = ""
|
|
for selector in BODY_SELECTORS:
|
|
node = soup.select_one(selector)
|
|
if not node:
|
|
continue
|
|
candidate = clone_tag(node)
|
|
strip_unwanted(candidate)
|
|
html = candidate.decode_contents().strip()
|
|
text_length = len(BeautifulSoup(html, "html.parser").get_text(" ", strip=True))
|
|
if text_length >= 120:
|
|
return html
|
|
if not fallback_html and has_meaningful_body_content(html):
|
|
fallback_html = html
|
|
return fallback_html
|
|
|
|
|
|
def extract_terms(soup: BeautifulSoup, selectors: list[str]) -> list[str]:
|
|
terms: list[str] = []
|
|
for selector in selectors:
|
|
for node in soup.select(selector):
|
|
term = clean_text(node.get_text(" ", strip=True))
|
|
if term and term not in terms:
|
|
terms.append(term)
|
|
return terms
|
|
|
|
|
|
def extract_drupal_title_adjacent_date(soup: BeautifulSoup) -> str:
|
|
title_node = find_title_node(soup)
|
|
if not title_node:
|
|
return ""
|
|
|
|
for sibling in title_node.next_siblings:
|
|
candidate = text_from_node(sibling)
|
|
normalized = normalize_drupal_date(candidate)
|
|
if normalized:
|
|
return normalized
|
|
|
|
header = title_node.find_parent(["header", "div", "section"])
|
|
if header:
|
|
header_text = clean_text(header.get_text(" ", strip=True))
|
|
title_text = clean_text(title_node.get_text(" ", strip=True))
|
|
if title_text and header_text.startswith(title_text):
|
|
header_text = clean_text(header_text[len(title_text):])
|
|
normalized = normalize_drupal_date(header_text)
|
|
if normalized:
|
|
return normalized
|
|
|
|
return ""
|
|
|
|
|
|
def extract_drupal_department_categories(soup: BeautifulSoup) -> list[str]:
|
|
categories: list[str] = []
|
|
label_pattern = re.compile(r"^\s*Department:\s*$", re.IGNORECASE)
|
|
|
|
for label_node in soup.find_all(string=label_pattern):
|
|
parent = label_node.parent if isinstance(label_node.parent, Tag) else None
|
|
if not parent:
|
|
continue
|
|
|
|
inline_value = extract_labeled_value(parent.get_text(" ", strip=True), "Department")
|
|
normalized_inline_value = normalize_department_category(inline_value)
|
|
if normalized_inline_value:
|
|
categories = merge_terms(categories, [normalized_inline_value])
|
|
continue
|
|
|
|
for sibling in parent.next_siblings:
|
|
value = normalize_department_category(text_from_node(sibling))
|
|
if value:
|
|
categories = merge_terms(categories, [value])
|
|
break
|
|
|
|
for candidate in soup.find_all(["p", "li", "span", "dt", "dd"]):
|
|
text = clean_text(candidate.get_text(" ", strip=True))
|
|
if not text.lower().startswith("department:"):
|
|
continue
|
|
extracted = normalize_department_category(extract_labeled_value(text, "Department"))
|
|
if extracted:
|
|
categories = merge_terms(categories, [extracted])
|
|
|
|
return categories
|
|
|
|
|
|
def extract_author_from_json_ld(article: dict) -> str:
|
|
author = article.get("author")
|
|
if isinstance(author, dict):
|
|
return clean_text(author.get("name", ""))
|
|
if isinstance(author, list):
|
|
names = [clean_text(item.get("name", "")) for item in author if isinstance(item, dict)]
|
|
return ", ".join(name for name in names if name)
|
|
if isinstance(author, str):
|
|
return clean_text(author)
|
|
return ""
|
|
|
|
|
|
def extract_body_from_article(article: dict, soup: BeautifulSoup) -> str:
|
|
body = article.get("articleBody")
|
|
if isinstance(body, str) and len(body.strip()) > 120:
|
|
return f"<p>{unescape(body.strip())}</p>"
|
|
return extract_body(soup)
|
|
|
|
|
|
def normalize_terms(value: object) -> list[str]:
|
|
if isinstance(value, str):
|
|
parts = re.split(r"[,|>]", value)
|
|
return [clean_text(part) for part in parts if clean_text(part)]
|
|
if isinstance(value, list):
|
|
result: list[str] = []
|
|
for item in value:
|
|
if isinstance(item, str):
|
|
cleaned = clean_text(item)
|
|
if cleaned and cleaned not in result:
|
|
result.append(cleaned)
|
|
return result
|
|
return []
|
|
|
|
|
|
def merge_terms(*groups: list[str]) -> list[str]:
|
|
merged: list[str] = []
|
|
for group in groups:
|
|
for item in group:
|
|
cleaned = clean_text(item)
|
|
if cleaned and cleaned not in merged:
|
|
merged.append(cleaned)
|
|
return merged
|
|
|
|
|
|
def normalize_drupal_date(value: str | None) -> str:
|
|
if not value:
|
|
return ""
|
|
match = DRUPAL_TITLE_DATE_PATTERN.search(value)
|
|
if not match:
|
|
return ""
|
|
return normalize_date(match.group(0))
|
|
|
|
|
|
def meta_content(soup: BeautifulSoup, tag_name: str, attrs: dict[str, str]) -> str:
|
|
node = soup.find(tag_name, attrs=attrs)
|
|
if node and node.get("content"):
|
|
return node["content"].strip()
|
|
return ""
|
|
|
|
|
|
def clean_text(value: str) -> str:
|
|
return re.sub(r"\s+", " ", value or "").strip()
|
|
|
|
|
|
def text_from_node(node: object) -> str:
|
|
if isinstance(node, NavigableString):
|
|
return clean_text(str(node))
|
|
if isinstance(node, Tag):
|
|
return clean_text(node.get_text(" ", strip=True))
|
|
return ""
|
|
|
|
|
|
def sanitize_html(html: str) -> str:
|
|
if not html:
|
|
return ""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
strip_unwanted(soup)
|
|
strip_dangerous_attributes(soup)
|
|
return soup.decode_contents().strip()
|
|
|
|
|
|
def has_meaningful_body_content(html: str) -> bool:
|
|
if not html:
|
|
return False
|
|
text = BeautifulSoup(html, "html.parser").get_text(" ", strip=True)
|
|
return bool(text) or any(token in html.lower() for token in ("<img", "<a ", "<embed", "<object"))
|
|
|
|
|
|
def strip_unwanted(node: BeautifulSoup | Tag) -> None:
|
|
for selector in ("script", "style", "noscript", "iframe", "form", "nav", ".share", ".social-share"):
|
|
for child in node.select(selector):
|
|
child.decompose()
|
|
|
|
|
|
def strip_dangerous_attributes(node: BeautifulSoup | Tag) -> None:
|
|
for child in node.find_all(True):
|
|
for attr_name in list(child.attrs):
|
|
normalized_name = attr_name.lower()
|
|
if normalized_name.startswith("on") or normalized_name == "srcdoc":
|
|
del child.attrs[attr_name]
|
|
continue
|
|
|
|
if normalized_name not in {"href", "src", "action", "formaction", "xlink:href"}:
|
|
continue
|
|
|
|
raw_value = child.attrs.get(attr_name)
|
|
if isinstance(raw_value, list):
|
|
candidate = " ".join(str(item) for item in raw_value)
|
|
else:
|
|
candidate = str(raw_value or "")
|
|
|
|
lowered = candidate.strip().lower()
|
|
if lowered.startswith(("javascript:", "vbscript:", "data:text/html")):
|
|
del child.attrs[attr_name]
|
|
|
|
|
|
def clone_tag(node: Tag) -> BeautifulSoup:
|
|
return BeautifulSoup(str(node), "html.parser")
|
|
|
|
|
|
def find_title_node(soup: BeautifulSoup) -> Tag | None:
|
|
for selector in ("article h1", "h1.entry-title", "h1.page-title", "h1.title", "h1"):
|
|
node = soup.select_one(selector)
|
|
if node:
|
|
return node
|
|
return None
|
|
|
|
|
|
def extract_labeled_value(text: str, label: str) -> str:
|
|
if not text:
|
|
return ""
|
|
|
|
pattern = re.compile(
|
|
rf"{re.escape(label)}:\s*(.+?)(?=\s+(?:[A-Z][a-z]+:)|\s{{2,}}|$)",
|
|
re.IGNORECASE,
|
|
)
|
|
match = pattern.search(clean_text(text))
|
|
if not match:
|
|
return ""
|
|
return clean_text(match.group(1))
|
|
|
|
|
|
def normalize_department_category(value: str) -> str:
|
|
cleaned = clean_text(value)
|
|
if not cleaned:
|
|
return ""
|
|
if len(cleaned) > 80 or len(cleaned.split()) > 8:
|
|
return ""
|
|
if any(token in cleaned.lower() for token in ("p.o. box", "contact us", "@", "http://", "https://")):
|
|
return ""
|
|
return cleaned
|
|
|
|
|
|
def format_error_summary(
|
|
url: str,
|
|
exc: Exception,
|
|
response: requests.Response | None,
|
|
timeout_seconds: int,
|
|
) -> str:
|
|
if isinstance(exc, requests.HTTPError):
|
|
failing_response = exc.response or response
|
|
if failing_response is not None:
|
|
return (
|
|
f"HTTP {failing_response.status_code} {failing_response.reason} "
|
|
f"while fetching {failing_response.url or url}"
|
|
)
|
|
if isinstance(exc, requests.Timeout):
|
|
return f"Request timed out after {timeout_seconds}s while fetching {url}"
|
|
if isinstance(exc, requests.RequestException):
|
|
return f"{type(exc).__name__} while fetching {url}: {exc}"
|
|
return f"{type(exc).__name__}: {exc}"
|
|
|
|
|
|
def format_error_details(
|
|
url: str,
|
|
exc: Exception,
|
|
response: requests.Response | None,
|
|
) -> str:
|
|
details = [
|
|
f"URL: {url}",
|
|
f"Error Type: {type(exc).__name__}",
|
|
f"Message: {exc}",
|
|
]
|
|
|
|
failing_response = getattr(exc, "response", None) or response
|
|
if failing_response is not None:
|
|
details.extend(
|
|
[
|
|
f"HTTP Status: {failing_response.status_code} {failing_response.reason}",
|
|
f"Resolved URL: {failing_response.url}",
|
|
]
|
|
)
|
|
|
|
trace = "".join(traceback.format_exception_only(type(exc), exc)).strip()
|
|
if trace:
|
|
details.append(f"Exception: {trace}")
|
|
|
|
return "\n".join(details)
|