Files
2026-04-09 11:27:13 -07:00

981 lines
33 KiB
Python

from __future__ import annotations
import argparse
import csv
import json
import os
import sys
import time
import xml.etree.ElementTree as ET
from collections import deque
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from dataclasses import dataclass
from html.parser import HTMLParser
from pathlib import Path
from typing import Iterable
from urllib.error import HTTPError, URLError
from urllib.parse import urljoin, urlsplit, urlunsplit
from urllib.request import Request, urlopen
if os.name == "nt":
import msvcrt
DEFAULT_USER_AGENT = "SitemapBuilder/1.0 (+local script)"
DEFAULT_OUTPUT_NAME = "sitemap.csv"
DEFAULT_STATE_SUFFIX = ".crawlstate.json"
DEFAULT_LOG_SUFFIX = ".crawl.log"
DEFAULT_MAX_PAGES = 10000
DEFAULT_RESUME_PAGE_INCREMENT = 10000
DEFAULT_SAVE_EVERY = 25
SCRIPT_DIR = Path(__file__).resolve().parent
DOCUMENT_EXTENSIONS = {
".pdf",
".csv",
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".txt",
".rtf",
".zip",
".xml",
".json",
}
def detect_default_workers() -> int:
affinity_count: int | None = None
get_affinity = getattr(os, "sched_getaffinity", None)
if callable(get_affinity):
try:
affinity_count = len(get_affinity(0))
except OSError:
affinity_count = None
cpu_count = os.cpu_count() or 1
if affinity_count:
return max(affinity_count, 1)
return max(cpu_count, 1)
DEFAULT_WORKERS = detect_default_workers()
@dataclass
class CrawlResult:
url: str
links: list[str]
title: str = ""
canonical_url: str = ""
skipped: bool = False
error: str | None = None
@dataclass
class CrawlState:
start_url: str
include_subdomains: bool
include_documents: bool
visited: set[str]
queued: set[str]
queue: deque[str]
records: dict[str, dict[str, str]]
alias_to_canonical: dict[str, str]
errors: list[dict[str, str]]
skipped_count: int
discovered_from_sitemaps: int
@dataclass
class RuntimeControl:
paused: bool = False
stop_requested: bool = False
@dataclass
class CrawlRunResult:
state: CrawlState
user_stopped: bool
output_path: Path
state_path: Path
log_path: Path
max_pages: int
workers: int
class HTMLPageParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.links: list[str] = []
self.title_parts: list[str] = []
self.in_title = False
self.canonical_href = ""
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_map = {key.lower(): value for key, value in attrs}
lower_tag = tag.lower()
if lower_tag == "a":
href = attrs_map.get("href")
if href:
self.links.append(href)
if lower_tag == "title":
self.in_title = True
if lower_tag == "link":
rel = (attrs_map.get("rel") or "").lower()
href = attrs_map.get("href") or ""
if "canonical" in rel and href:
self.canonical_href = href
def handle_endtag(self, tag: str) -> None:
if tag.lower() == "title":
self.in_title = False
def handle_data(self, data: str) -> None:
if self.in_title:
self.title_parts.append(data)
@property
def title(self) -> str:
return " ".join(part.strip() for part in self.title_parts if part.strip()).strip()
def normalize_url(url: str) -> str:
parts = urlsplit(url.strip())
scheme = parts.scheme.lower() or "https"
netloc = parts.netloc.lower()
path = parts.path or "/"
if path != "/" and path.endswith("/"):
path = path.rstrip("/")
return urlunsplit((scheme, netloc, path, parts.query, ""))
def is_http_url(url: str) -> bool:
return urlsplit(url).scheme in {"http", "https"}
def build_allowed_hosts(start_url: str) -> set[str]:
return {urlsplit(start_url).netloc.lower()}
def should_visit(url: str, allowed_hosts: set[str], include_subdomains: bool) -> bool:
if not is_http_url(url):
return False
host = urlsplit(url).netloc.lower()
if include_subdomains:
return any(host == allowed or host.endswith(f".{allowed}") for allowed in allowed_hosts)
return host in allowed_hosts
def is_document_url(url: str) -> bool:
return Path(urlsplit(url).path).suffix.lower() in DOCUMENT_EXTENSIONS
def should_record_url(url: str) -> bool:
query = urlsplit(url).query.lower()
return query != "page=1"
def get_state_path(output_path: Path) -> Path:
return output_path.with_suffix(output_path.suffix + DEFAULT_STATE_SUFFIX)
def get_log_path(output_path: Path) -> Path:
return output_path.with_suffix(output_path.suffix + DEFAULT_LOG_SUFFIX)
def cleanup_run_files(output_path: Path) -> list[Path]:
removed_paths: list[Path] = []
for path in (Path(output_path), get_state_path(Path(output_path)), get_log_path(Path(output_path))):
if path.exists():
path.unlink()
removed_paths.append(path)
return removed_paths
def log_message(log_path: Path, message: str) -> None:
log_path.parent.mkdir(parents=True, exist_ok=True)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with log_path.open("a", encoding="utf-8") as log_file:
log_file.write(f"[{timestamp}] {message}\n")
def resolve_alias(url: str, alias_to_canonical: dict[str, str]) -> str:
resolved = url
seen: set[str] = set()
while resolved in alias_to_canonical and resolved not in seen:
seen.add(resolved)
resolved = alias_to_canonical[resolved]
return resolved
def register_record(
state: CrawlState,
url: str,
record_type: str,
title: str = "",
canonical_url: str = "",
) -> None:
if not should_record_url(url):
return
existing = state.records.get(url, {"title": "", "canonical_url": "", "type": record_type})
if not existing.get("type"):
existing["type"] = record_type
elif existing["type"] == "document" and record_type == "page":
existing["type"] = "page"
if title and not existing.get("title"):
existing["title"] = title
if canonical_url and not existing.get("canonical_url"):
existing["canonical_url"] = canonical_url
if "canonical_url" not in existing:
existing["canonical_url"] = canonical_url
if "title" not in existing:
existing["title"] = title
state.records[url] = existing
def save_state(state: CrawlState, state_path: Path, output_path: Path) -> None:
state_path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"start_url": state.start_url,
"include_subdomains": state.include_subdomains,
"include_documents": state.include_documents,
"visited": sorted(state.visited),
"queued": sorted(state.queued),
"queue": list(state.queue),
"records": state.records,
"alias_to_canonical": state.alias_to_canonical,
"errors": state.errors,
"skipped_count": state.skipped_count,
"discovered_from_sitemaps": state.discovered_from_sitemaps,
"saved_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"output_path": str(output_path),
}
state_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def load_state(state_path: Path) -> CrawlState:
payload = json.loads(state_path.read_text(encoding="utf-8"))
return CrawlState(
start_url=payload["start_url"],
include_subdomains=bool(payload.get("include_subdomains", False)),
include_documents=bool(payload.get("include_documents", False)),
visited=set(payload.get("visited", [])),
queued=set(payload.get("queued", [])),
queue=deque(payload.get("queue", [])),
records=dict(payload.get("records", {})),
alias_to_canonical=dict(payload.get("alias_to_canonical", {})),
errors=list(payload.get("errors", [])),
skipped_count=int(payload.get("skipped_count", 0)),
discovered_from_sitemaps=int(payload.get("discovered_from_sitemaps", 0)),
)
def initialize_state(start_url: str, include_subdomains: bool, include_documents: bool) -> CrawlState:
normalized_start = normalize_url(start_url)
return CrawlState(
start_url=normalized_start,
include_subdomains=include_subdomains,
include_documents=include_documents,
visited=set(),
queued={normalized_start},
queue=deque([normalized_start]),
records={},
alias_to_canonical={},
errors=[],
skipped_count=0,
discovered_from_sitemaps=0,
)
def prompt_if_missing(value: str | None, prompt_text: str) -> str:
if value:
return value
return input(prompt_text).strip()
def prompt_yes_no(prompt_text: str, default: bool) -> bool:
suffix = "Y/n" if default else "y/N"
answer = input(f"{prompt_text} [{suffix}]: ").strip().lower()
if not answer:
return default
return answer in {"y", "yes"}
def write_csv(records: dict[str, dict[str, str]], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", newline="", encoding="utf-8") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["URL", "Title", "Canonical URL", "Type"])
for url in sorted(records):
record = records[url]
writer.writerow(
[
url,
record.get("title", ""),
record.get("canonical_url", ""),
record.get("type", ""),
]
)
def fetch_text(url: str, timeout: float, user_agent: str, accept: str) -> tuple[str | None, str | None]:
request = Request(url, headers={"User-Agent": user_agent, "Accept": accept})
try:
with urlopen(request, timeout=timeout) as response:
return (
response.read().decode(response.headers.get_content_charset() or "utf-8", errors="replace"),
None,
)
except HTTPError as exc:
return None, f"HTTP {exc.code}"
except URLError as exc:
return None, str(exc.reason)
except TimeoutError:
return None, "request timed out"
except Exception as exc: # pragma: no cover
return None, str(exc)
def fetch_page(url: str, timeout: float, user_agent: str) -> CrawlResult:
request = Request(
url,
headers={
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
},
)
try:
with urlopen(request, timeout=timeout) as response:
content_type = response.headers.get("Content-Type", "").lower()
if "text/html" not in content_type and "application/xhtml+xml" not in content_type:
return CrawlResult(url=url, links=[], skipped=True)
content = response.read().decode(response.headers.get_content_charset() or "utf-8", errors="replace")
except HTTPError as exc:
return CrawlResult(url=url, links=[], error=f"HTTP {exc.code}")
except URLError as exc:
return CrawlResult(url=url, links=[], error=str(exc.reason))
except TimeoutError:
return CrawlResult(url=url, links=[], error="request timed out")
except Exception as exc: # pragma: no cover
return CrawlResult(url=url, links=[], error=str(exc))
parser = HTMLPageParser()
parser.feed(content)
canonical_url = normalize_url(urljoin(url, parser.canonical_href)) if parser.canonical_href else ""
return CrawlResult(
url=url,
links=parser.links,
title=parser.title,
canonical_url=canonical_url,
)
def fetch_page_with_delay(url: str, timeout: float, user_agent: str, delay: float) -> CrawlResult:
if delay > 0:
time.sleep(delay)
return fetch_page(url, timeout=timeout, user_agent=user_agent)
def print_progress(state: CrawlState, max_pages: int, current_url: str) -> None:
print(
f"[{len(state.visited)}/{max_pages}] Found {len(state.records)} URL(s), "
f"queued {len(state.queue)} more: {current_url}"
)
def poll_runtime_control(control: RuntimeControl, log_path: Path) -> None:
if os.name != "nt":
return
while msvcrt.kbhit():
key = msvcrt.getwch().lower()
if key == "p" and not control.paused:
control.paused = True
print("Paused. Press R to resume or Q to stop.")
log_message(log_path, "Crawl paused by user")
elif key == "r" and control.paused:
control.paused = False
print("Resuming crawl.")
log_message(log_path, "Crawl resumed by user")
elif key == "q":
control.stop_requested = True
log_message(log_path, "Stop requested by user")
def discover_robots_sitemaps(
start_url: str,
timeout: float,
user_agent: str,
log_path: Path,
) -> set[str]:
robots_url = normalize_url(urljoin(start_url, "/robots.txt"))
content, error = fetch_text(robots_url, timeout, user_agent, "text/plain,*/*;q=0.8")
if error:
log_message(log_path, f"robots.txt not available at {robots_url}: {error}")
return set()
sitemap_urls: set[str] = set()
for line in content.splitlines():
if line.lower().startswith("sitemap:"):
raw_url = line.split(":", 1)[1].strip()
if raw_url:
sitemap_urls.add(normalize_url(raw_url))
if sitemap_urls:
log_message(log_path, f"Discovered {len(sitemap_urls)} sitemap reference(s) from robots.txt")
return sitemap_urls
def xml_local_name(tag: str) -> str:
if "}" in tag:
return tag.rsplit("}", 1)[1]
return tag
def parse_sitemap_urls(
sitemap_url: str,
allowed_hosts: set[str],
include_subdomains: bool,
timeout: float,
user_agent: str,
log_path: Path,
seen_sitemaps: set[str],
) -> set[str]:
normalized_sitemap = normalize_url(sitemap_url)
if normalized_sitemap in seen_sitemaps:
return set()
seen_sitemaps.add(normalized_sitemap)
if not should_visit(normalized_sitemap, allowed_hosts, include_subdomains):
return set()
content, error = fetch_text(normalized_sitemap, timeout, user_agent, "application/xml,text/xml;q=0.9,*/*;q=0.8")
if error:
log_message(log_path, f"Sitemap fetch failed for {normalized_sitemap}: {error}")
return set()
try:
root = ET.fromstring(content)
except ET.ParseError as exc:
log_message(log_path, f"Sitemap parse failed for {normalized_sitemap}: {exc}")
return set()
tag_name = xml_local_name(root.tag)
discovered_urls: set[str] = set()
if tag_name == "urlset":
for element in root.findall(".//"):
if xml_local_name(element.tag) == "loc" and element.text:
normalized = normalize_url(element.text.strip())
if should_visit(normalized, allowed_hosts, include_subdomains):
discovered_urls.add(normalized)
elif tag_name == "sitemapindex":
for element in root.findall(".//"):
if xml_local_name(element.tag) == "loc" and element.text:
child_sitemap = normalize_url(element.text.strip())
discovered_urls.update(
parse_sitemap_urls(
child_sitemap,
allowed_hosts,
include_subdomains,
timeout,
user_agent,
log_path,
seen_sitemaps,
)
)
else:
log_message(log_path, f"Unsupported sitemap format at {normalized_sitemap}")
return discovered_urls
def seed_from_xml_sitemaps(
state: CrawlState,
timeout: float,
user_agent: str,
log_path: Path,
) -> None:
allowed_hosts = build_allowed_hosts(state.start_url)
sitemap_candidates = discover_robots_sitemaps(state.start_url, timeout, user_agent, log_path)
sitemap_candidates.add(normalize_url(urljoin(state.start_url, "/sitemap.xml")))
seen_sitemaps: set[str] = set()
discovered_urls: set[str] = set()
for sitemap_url in sitemap_candidates:
discovered_urls.update(
parse_sitemap_urls(
sitemap_url,
allowed_hosts,
state.include_subdomains,
timeout,
user_agent,
log_path,
seen_sitemaps,
)
)
added = 0
for url in discovered_urls:
canonical_url = resolve_alias(url, state.alias_to_canonical)
if is_document_url(canonical_url):
if state.include_documents:
register_record(state, canonical_url, "document")
added += 1
continue
register_record(state, canonical_url, "page")
if canonical_url not in state.visited and canonical_url not in state.queued:
state.queue.append(canonical_url)
state.queued.add(canonical_url)
added += 1
state.discovered_from_sitemaps += added
log_message(log_path, f"Added {added} URL(s) from XML sitemap discovery")
def process_crawl_result(
state: CrawlState,
result: CrawlResult,
allowed_hosts: set[str],
log_path: Path,
) -> None:
if result.error:
state.errors.append({"url": result.url, "error": result.error})
log_message(log_path, f"Error fetching {result.url}: {result.error}")
return
if result.skipped:
state.skipped_count += 1
register_record(state, result.url, "document")
return
canonical_url = ""
if result.canonical_url and should_visit(result.canonical_url, allowed_hosts, state.include_subdomains):
canonical_url = resolve_alias(result.canonical_url, state.alias_to_canonical)
state.alias_to_canonical[result.url] = canonical_url
register_record(state, canonical_url, "page", title=result.title, canonical_url=canonical_url)
if canonical_url not in state.visited and canonical_url not in state.queued:
state.queue.append(canonical_url)
state.queued.add(canonical_url)
register_record(state, result.url, "page", title=result.title, canonical_url=canonical_url)
for raw_link in result.links:
absolute = normalize_url(urljoin(result.url, raw_link))
if not should_visit(absolute, allowed_hosts, state.include_subdomains):
continue
absolute = resolve_alias(absolute, state.alias_to_canonical)
if is_document_url(absolute):
if state.include_documents:
register_record(state, absolute, "document")
continue
register_record(state, absolute, "page")
if absolute not in state.queued and absolute not in state.visited:
state.queue.append(absolute)
state.queued.add(absolute)
def crawl_site(
state: CrawlState,
max_pages: int,
delay: float,
timeout: float,
user_agent: str,
state_path: Path,
output_path: Path,
log_path: Path,
save_every: int,
workers: int,
) -> tuple[CrawlState, bool]:
allowed_hosts = build_allowed_hosts(state.start_url)
processed_since_save = 0
user_stopped = False
control = RuntimeControl()
if workers <= 1:
while state.queue and len(state.visited) < max_pages:
poll_runtime_control(control, log_path)
if control.stop_requested:
user_stopped = True
print("Stop requested. Saving progress and finishing cleanly...")
break
while control.paused and not control.stop_requested:
time.sleep(0.2)
poll_runtime_control(control, log_path)
if control.stop_requested:
user_stopped = True
print("Stop requested. Saving progress and finishing cleanly...")
break
current = resolve_alias(state.queue.popleft(), state.alias_to_canonical)
state.queued.discard(current)
if current in state.visited:
continue
state.visited.add(current)
register_record(state, current, "page")
print_progress(state, max_pages, current)
result = fetch_page_with_delay(current, timeout=timeout, user_agent=user_agent, delay=delay)
process_crawl_result(state, result, allowed_hosts, log_path)
processed_since_save += 1
if processed_since_save >= save_every:
write_csv(state.records, output_path)
save_state(state, state_path, output_path)
log_message(log_path, f"Saved progress after {len(state.visited)} visited page(s)")
processed_since_save = 0
else:
with ThreadPoolExecutor(max_workers=workers) as executor:
pending: dict[object, str] = {}
while pending or (state.queue and len(state.visited) < max_pages):
poll_runtime_control(control, log_path)
if control.stop_requested:
user_stopped = True
print("Stop requested. No new pages will be queued. Waiting for active requests to finish...")
break
if control.paused:
if pending:
completed, _ = wait(pending.keys(), timeout=0.2, return_when=FIRST_COMPLETED)
for future in completed:
pending.pop(future, None)
result = future.result()
process_crawl_result(state, result, allowed_hosts, log_path)
processed_since_save += 1
else:
time.sleep(0.2)
if processed_since_save >= save_every:
write_csv(state.records, output_path)
save_state(state, state_path, output_path)
log_message(log_path, f"Saved progress after {len(state.visited)} visited page(s)")
processed_since_save = 0
continue
while state.queue and len(pending) < workers and len(state.visited) < max_pages:
current = resolve_alias(state.queue.popleft(), state.alias_to_canonical)
state.queued.discard(current)
if current in state.visited:
continue
state.visited.add(current)
register_record(state, current, "page")
print_progress(state, max_pages, current)
future = executor.submit(fetch_page_with_delay, current, timeout, user_agent, delay)
pending[future] = current
if not pending:
continue
completed, _ = wait(pending.keys(), timeout=0.2, return_when=FIRST_COMPLETED)
for future in completed:
pending.pop(future, None)
result = future.result()
process_crawl_result(state, result, allowed_hosts, log_path)
processed_since_save += 1
if processed_since_save >= save_every:
write_csv(state.records, output_path)
save_state(state, state_path, output_path)
log_message(log_path, f"Saved progress after {len(state.visited)} visited page(s)")
processed_since_save = 0
if user_stopped and pending:
completed, _ = wait(pending.keys())
for future in completed:
pending.pop(future, None)
result = future.result()
process_crawl_result(state, result, allowed_hosts, log_path)
write_csv(state.records, output_path)
save_state(state, state_path, output_path)
log_message(log_path, f"Final save completed with {len(state.records)} URL(s) recorded")
return state, user_stopped
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Crawl a website and export discovered internal URLs to a CSV sitemap.",
)
parser.add_argument("url", nargs="?", help="Starting URL to crawl, for example https://example.com")
parser.add_argument(
"-o",
"--output",
help=f"Output CSV path. Defaults to {DEFAULT_OUTPUT_NAME} in the script folder.",
)
parser.add_argument(
"--max-pages",
type=int,
default=DEFAULT_MAX_PAGES,
help=f"Maximum number of pages to crawl before stopping. Default: {DEFAULT_MAX_PAGES}",
)
parser.add_argument(
"--delay",
type=float,
default=0.0,
help="Delay in seconds between requests. Default: 0",
)
parser.add_argument(
"--timeout",
type=float,
default=15.0,
help="Request timeout in seconds. Default: 15",
)
parser.add_argument(
"--include-subdomains",
action="store_true",
help="Also crawl subdomains of the starting host.",
)
parser.add_argument(
"--include-documents",
action="store_true",
help="Include document links like PDF, CSV, DOC, and DOCX in the sitemap output.",
)
parser.add_argument(
"--save-every",
type=int,
default=DEFAULT_SAVE_EVERY,
help=f"Save progress after this many pages. Default: {DEFAULT_SAVE_EVERY}",
)
parser.add_argument(
"--resume",
action="store_true",
help="Resume from the saved crawl state if a state file already exists.",
)
parser.add_argument(
"--fresh",
action="store_true",
help="Ignore any saved crawl state and start over.",
)
parser.add_argument(
"--workers",
type=int,
default=0,
help=f"Number of worker threads. Use 1 to disable multithreading. Default when prompted on: {DEFAULT_WORKERS}",
)
return parser.parse_args()
def run_crawl(
*,
start_url: str,
output_path: Path,
max_pages: int = DEFAULT_MAX_PAGES,
delay: float = 0.0,
timeout: float = 15.0,
include_subdomains: bool = False,
include_documents: bool = False,
save_every: int = DEFAULT_SAVE_EVERY,
workers: int = DEFAULT_WORKERS,
resume: bool = True,
fresh: bool = False,
user_agent: str = DEFAULT_USER_AGENT,
) -> CrawlRunResult:
if not start_url:
raise ValueError("A starting URL is required.")
if "://" not in start_url:
start_url = f"https://{start_url}"
normalized_start = normalize_url(start_url)
if not is_http_url(normalized_start):
raise ValueError("Only http and https URLs are supported.")
output_path = Path(output_path)
state_path = get_state_path(output_path)
log_path = get_log_path(output_path)
state: CrawlState
if state_path.exists() and not fresh and resume:
state = load_state(state_path)
if state.start_url != normalized_start:
raise ValueError(
"The saved crawl state belongs to a different starting URL. "
"Use a different output name or start a fresh crawl."
)
if state.include_documents != include_documents:
raise ValueError(
"The saved crawl state uses a different document setting. "
"Keep the same choice or start a fresh crawl."
)
if state.include_subdomains != include_subdomains:
raise ValueError(
"The saved crawl state uses a different subdomain setting. "
"Keep the same choice or start a fresh crawl."
)
else:
state = initialize_state(normalized_start, include_subdomains, include_documents)
effective_workers = max(int(workers), 1)
effective_max_pages = max(int(max_pages), 1)
if state.visited:
effective_max_pages = max(effective_max_pages, len(state.visited) + DEFAULT_RESUME_PAGE_INCREMENT)
else:
seed_from_xml_sitemaps(state, max(timeout, 1.0), user_agent, log_path)
log_message(log_path, f"Starting crawl for {state.start_url}")
log_message(log_path, f"Output CSV: {output_path.resolve()}")
log_message(log_path, f"State file: {state_path.resolve()}")
log_message(log_path, f"Multithreading workers: {effective_workers}")
log_message(log_path, f"Include documents: {state.include_documents}")
state, user_stopped = crawl_site(
state=state,
max_pages=effective_max_pages,
delay=max(delay, 0.0),
timeout=max(timeout, 1.0),
user_agent=user_agent,
state_path=state_path,
output_path=output_path,
log_path=log_path,
save_every=max(save_every, 1),
workers=effective_workers,
)
if user_stopped:
log_message(log_path, "Crawl stopped by user")
elif state.queue and len(state.visited) >= effective_max_pages:
log_message(log_path, "Crawl stopped at max page limit")
elif state.queue:
log_message(log_path, "Crawl stopped before queue emptied")
else:
log_message(log_path, "Crawl completed with empty queue")
return CrawlRunResult(
state=state,
user_stopped=user_stopped,
output_path=output_path,
state_path=state_path,
log_path=log_path,
max_pages=effective_max_pages,
workers=effective_workers,
)
def main() -> int:
args = parse_args()
start_url = prompt_if_missing(args.url, "Enter the website URL to crawl: ")
if not start_url:
print("A starting URL is required.", file=sys.stderr)
return 1
if "://" not in start_url:
start_url = f"https://{start_url}"
normalized_start = normalize_url(start_url)
if not is_http_url(normalized_start):
print("Only http and https URLs are supported.", file=sys.stderr)
return 1
output_value = prompt_if_missing(args.output, f"Enter output CSV path [{DEFAULT_OUTPUT_NAME}]: ")
output_path = Path(output_value) if output_value else SCRIPT_DIR / DEFAULT_OUTPUT_NAME
state_path = get_state_path(output_path)
log_path = get_log_path(output_path)
include_documents = args.include_documents or prompt_yes_no(
"Include document links such as PDF, CSV, DOC, and DOCX in the sitemap?",
default=False,
)
workers = args.workers
if workers <= 0:
enable_multithreading = prompt_yes_no(
f"Enable multithreading for faster scanning? {DEFAULT_WORKERS} worker threads will be used.",
default=True,
)
workers = DEFAULT_WORKERS if enable_multithreading else 1
print(f"Crawling {normalized_start}")
print(f"Output file: {output_path.resolve()}")
print(f"State file: {state_path.resolve()}")
print(f"Log file: {log_path.resolve()}")
resume_existing = False
if state_path.exists() and not args.fresh:
resume_existing = args.resume or prompt_yes_no(
f"Found saved crawl state at {state_path.name}. Resume from where it left off?",
default=True,
)
try:
run_result = run_crawl(
start_url=normalized_start,
output_path=output_path,
max_pages=args.max_pages,
delay=args.delay,
timeout=args.timeout,
include_subdomains=args.include_subdomains,
include_documents=include_documents,
save_every=args.save_every,
workers=workers,
resume=resume_existing,
fresh=args.fresh,
user_agent=DEFAULT_USER_AGENT,
)
except ValueError as exc:
print(str(exc), file=sys.stderr)
return 1
state = run_result.state
user_stopped = run_result.user_stopped
effective_max_pages = run_result.max_pages
print(f"Max pages: {effective_max_pages}")
print(f"Include documents: {'Yes' if state.include_documents else 'No'}")
print(f"Multithreading: {'Yes' if run_result.workers > 1 else 'No'}")
print(f"Worker threads: {run_result.workers}")
if os.name == "nt":
print("Press P to pause, R to resume, or Q to stop cleanly and save progress.")
if resume_existing:
print("Resumed from the existing crawl state file.")
log_message(log_path, "Resumed from existing crawl state")
print(f"Found {len(state.records)} unique URL(s).")
print(f"Visited pages: {len(state.visited)}")
print(f"Queued pages remaining: {len(state.queue)}")
print(f"URLs added from XML sitemaps: {state.discovered_from_sitemaps}")
if state.errors:
print(f"Pages with errors: {len(state.errors)}")
for result in state.errors[:10]:
print(f" {result['url']} -> {result['error']}")
if state.skipped_count:
print(f"Non-HTML pages skipped while crawling: {state.skipped_count}")
if user_stopped:
print("Stopped by user. Run it again to continue from the saved state.")
log_message(log_path, "Crawl stopped by user")
elif state.queue and len(state.visited) >= effective_max_pages:
print("Stopped because the max page limit was reached. Run it again to continue.")
log_message(log_path, "Crawl stopped at max page limit")
elif state.queue:
print("Stopped before the queue was empty. Run it again to continue.")
log_message(log_path, "Crawl stopped before queue emptied")
else:
print("Crawl complete. No queued pages remain.")
log_message(log_path, "Crawl completed with empty queue")
print("Done.")
return 0
if __name__ == "__main__":
raise SystemExit(main())